diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index e628a253563a1..bf469d7a921d5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -81,6 +81,10 @@ public class ManagedLedgerConfig { @Getter @Setter private boolean cacheEvictionByMarkDeletedPosition = false; + private boolean supportTwoPhaseDeletion = false; + private int archiveDataLimitSize = 500; + private int retryDeleteIntervalSeconds = 60; + private int maxDeleteCount = 5; public boolean isCreateIfMissing() { return createIfMissing; @@ -683,4 +687,35 @@ public void setInactiveLedgerRollOverTime(int inactiveLedgerRollOverTimeMs, Time this.inactiveLedgerRollOverTimeMs = (int) unit.toMillis(inactiveLedgerRollOverTimeMs); } + public boolean isSupportTwoPhaseDeletion() { + return supportTwoPhaseDeletion; + } + + public void setSupportTwoPhaseDeletion(boolean supportTwoPhaseDeletion) { + this.supportTwoPhaseDeletion = supportTwoPhaseDeletion; + } + + public int getArchiveDataLimitSize() { + return archiveDataLimitSize; + } + + public void setArchiveDataLimitSize(int archiveDataLimitSize) { + this.archiveDataLimitSize = archiveDataLimitSize; + } + + public int getRetryDeleteIntervalSeconds() { + return retryDeleteIntervalSeconds; + } + + public void setRetryDeleteIntervalSeconds(int retryDeleteIntervalSeconds) { + this.retryDeleteIntervalSeconds = retryDeleteIntervalSeconds; + } + + public void setMaxDeleteCount(int maxDeleteCount) { + this.maxDeleteCount = maxDeleteCount; + } + + public int getMaxDeleteCount() { + return maxDeleteCount; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedTrash.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedTrash.java new file mode 100644 index 0000000000000..18a1e3a666ca5 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedTrash.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.mledger.impl.ManagedTrashImpl; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; + +public interface ManagedTrash { + + enum ManagedType { + MANAGED_LEDGER("managed-ledger"), + MANAGED_CURSOR("managed-cursor"), + SCHEMA("schema"); + private final String name; + + ManagedType(String name) { + this.name = name; + } + + public String getName() { + return name; + } + } + + enum LedgerType { + BOTH, + OFFLOAD_LEDGER, + LEDGER + } + + /** + * ManagedTrash name. + * + * @return full topic name + type + */ + String name(); + + /** + * Initialize. + */ + CompletableFuture initialize(); + + /** + * Append waiting to delete ledger. + * + * @param ledgerId ledgerId + * @param context ledgerInfo, if offload ledger, need offload context + * @param type LEDGER or OFFLOAD_LEDGER + * @throws ManagedLedgerException + */ + void appendLedgerTrashData(long ledgerId, LedgerInfo context, LedgerType type) throws ManagedLedgerException; + + /** + * Persist trash data to meta store. + */ + CompletableFuture asyncUpdateTrashData(); + + /** + * Trigger deletion procedure. + */ + void triggerDeleteInBackground(); + + /** + * Get all archive index, it needs combine with getArchiveData. + */ + CompletableFuture> getAllArchiveIndex(); + + /** + * Get archive data detail info. + * + * @param index archive index + * @return + */ + CompletableFuture> getArchiveData(long index); + + /** + * Async close managedTrash, it will persist trash data to meta store. + * @return + */ + CompletableFuture asyncClose(); + + /** + * Async close managedTrash, it can ensure that all ledger least delete once (exclude offload_ledger). + * @return + */ + CompletableFuture asyncCloseAfterAllLedgerDeleteOnce(); +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedTrashMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedTrashMXBean.java new file mode 100644 index 0000000000000..5165b215cbb4f --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedTrashMXBean.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.common.annotation.InterfaceAudience; +import org.apache.bookkeeper.common.annotation.InterfaceStability; + +/** + * JMX Bean interface for ManagedTrash stats. + */ +@InterfaceAudience.LimitedPrivate +@InterfaceStability.Stable +public interface ManagedTrashMXBean { + + String getName(); + + long getCurrentNumberOfLedgersWaitingToDelete(); + + void increaseTotalNumberOfDeleteLedgers(); + + long getTotalNumberOfDeleteLedgers(); + + long getCurrentNumberOfLedgersWaitingToArchive(); + + void increaseTotalNumberOfArchiveLedgers(); + + long getTotalNumberOfArchiveLedgers(); +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 48e85423e8486..5c62188c304b1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -387,7 +387,7 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final bookkeeperFactory.get( new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(), config.getBookKeeperEnsemblePlacementPolicyProperties())), - store, config, scheduledExecutor, name, mlOwnershipChecker); + store, metadataStore, config, scheduledExecutor, name, mlOwnershipChecker); PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger); pendingInitializeLedgers.put(name, pendingLedger); newledger.initialize(new ManagedLedgerInitializeLedgerCallback() { @@ -480,7 +480,7 @@ public void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosi bookkeeperFactory .get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(), config.getBookKeeperEnsemblePlacementPolicyProperties())), - store, config, scheduledExecutor, managedLedgerName); + store, metadataStore, config, scheduledExecutor, managedLedgerName); roManagedLedger.initializeAndCreateCursor((PositionImpl) startPosition) .thenAccept(roCursor -> callback.openReadOnlyCursorComplete(roCursor, ctx)) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f3bcb611ca3f4..15c0b25f0e59b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.time.Clock; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -69,6 +70,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.Getter; import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; @@ -113,6 +115,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.ManagedLedgerMXBean; +import org.apache.bookkeeper.mledger.ManagedTrash; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.WaitingEntryCallBack; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; @@ -139,6 +142,7 @@ import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; +import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -157,6 +161,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { protected ManagedLedgerConfig config; protected Map propertiesMap; protected final MetaStore store; + protected final ManagedTrash managedTrash; final ConcurrentLongHashMap> ledgerCache = ConcurrentLongHashMap.>newBuilder() @@ -304,13 +309,14 @@ public enum PositionBound { Map createdLedgerCustomMetadata; public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, - ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, - final String name) { - this(factory, bookKeeper, store, config, scheduledExecutor, name, null); + MetadataStore metadataStore, ManagedLedgerConfig config, + OrderedScheduler scheduledExecutor, final String name) { + this(factory, bookKeeper, store, metadataStore, config, scheduledExecutor, name, null); } public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, - ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, - final String name, final Supplier mlOwnershipChecker) { + MetadataStore metadataStore, ManagedLedgerConfig config, + OrderedScheduler scheduledExecutor, final String name, + final Supplier mlOwnershipChecker) { this.factory = factory; this.bookKeeper = bookKeeper; this.config = config; @@ -343,96 +349,103 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper this.managedLedgerInterceptor = config.getManagedLedgerInterceptor(); } this.inactiveLedgerRollOverTimeMs = config.getInactiveLedgerRollOverTimeMs(); + this.managedTrash = config.isSupportTwoPhaseDeletion() + ? new ManagedTrashImpl(ManagedTrash.ManagedType.MANAGED_LEDGER, name, metadataStore, config, + scheduledExecutor, executor, bookKeeper, ledgers) : new ManagedTrashDisableImpl(); } synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) { log.info("Opening managed ledger {}", name); - - // Fetch the list of existing ledgers in the managed ledger + // Fetch the list of existing ledgers in the managed ledger store.getManagedLedgerInfo(name, config.isCreateIfMissing(), config.getProperties(), new MetaStoreCallback() { - @Override - public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { - ledgersStat = stat; - if (mlInfo.hasTerminatedPosition()) { - state = State.Terminated; - lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition()); - log.info("[{}] Recovering managed ledger terminated at {}", name, lastConfirmedEntry); - } + @Override + public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { + ledgersStat = stat; + if (mlInfo.hasTerminatedPosition()) { + state = State.Terminated; + lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition()); + log.info("[{}] Recovering managed ledger terminated at {}", name, + lastConfirmedEntry); + } - for (LedgerInfo ls : mlInfo.getLedgerInfoList()) { - ledgers.put(ls.getLedgerId(), ls); - } + for (LedgerInfo ls : mlInfo.getLedgerInfoList()) { + ledgers.put(ls.getLedgerId(), ls); + } - if (mlInfo.getPropertiesCount() > 0) { - propertiesMap = Maps.newHashMap(); - for (int i = 0; i < mlInfo.getPropertiesCount(); i++) { - MLDataFormats.KeyValue property = mlInfo.getProperties(i); - propertiesMap.put(property.getKey(), property.getValue()); - } - } - if (managedLedgerInterceptor != null) { - managedLedgerInterceptor.onManagedLedgerPropertiesInitialize(propertiesMap); - } + if (mlInfo.getPropertiesCount() > 0) { + propertiesMap = Maps.newHashMap(); + for (int i = 0; i < mlInfo.getPropertiesCount(); i++) { + MLDataFormats.KeyValue property = mlInfo.getProperties(i); + propertiesMap.put(property.getKey(), property.getValue()); + } + } + if (managedLedgerInterceptor != null) { + managedLedgerInterceptor.onManagedLedgerPropertiesInitialize(propertiesMap); + } + + // Last ledger stat may be zeroed, we must update it + if (!ledgers.isEmpty()) { + final long id = ledgers.lastKey(); + OpenCallback opencb = (rc, lh, ctx1) -> { + executor.executeOrdered(name, safeRun(() -> { + mbean.endDataLedgerOpenOp(); + if (log.isDebugEnabled()) { + log.debug("[{}] Opened ledger {}: {}", name, id, + BKException.getMessage(rc)); + } + if (rc == BKException.Code.OK) { + LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(id) + .setEntries(lh.getLastAddConfirmed() + 1) + .setSize(lh.getLength()).setTimestamp(clock.millis()).build(); + ledgers.put(id, info); + if (managedLedgerInterceptor != null) { + managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, + lh).thenRun(() -> initializeBookKeeper(callback)) + .exceptionally(ex -> { + callback.initializeFailed( + new ManagedLedgerInterceptException( + ex.getCause())); + return null; + }); + } else { + initializeBookKeeper(callback); + } + } else if (isNoSuchLedgerExistsException(rc)) { + log.warn("[{}] Ledger not found: {}", name, id); + ledgers.remove(id); + initializeBookKeeper(callback); + } else { + log.error("[{}] Failed to open ledger {}: {}", name, id, + BKException.getMessage(rc)); + callback.initializeFailed(createManagedLedgerException(rc)); + return; + } + })); + }; - // Last ledger stat may be zeroed, we must update it - if (!ledgers.isEmpty()) { - final long id = ledgers.lastKey(); - OpenCallback opencb = (rc, lh, ctx1) -> { - executor.executeOrdered(name, safeRun(() -> { - mbean.endDataLedgerOpenOp(); if (log.isDebugEnabled()) { - log.debug("[{}] Opened ledger {}: {}", name, id, BKException.getMessage(rc)); + log.debug("[{}] Opening ledger {}", name, id); } - if (rc == BKException.Code.OK) { - LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(id) - .setEntries(lh.getLastAddConfirmed() + 1).setSize(lh.getLength()) - .setTimestamp(clock.millis()).build(); - ledgers.put(id, info); - if (managedLedgerInterceptor != null) { - managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh) - .thenRun(() -> initializeBookKeeper(callback)) - .exceptionally(ex -> { - callback.initializeFailed( - new ManagedLedgerInterceptException(ex.getCause())); - return null; - }); - } else { - initializeBookKeeper(callback); - } - } else if (isNoSuchLedgerExistsException(rc)) { - log.warn("[{}] Ledger not found: {}", name, id); - ledgers.remove(id); - initializeBookKeeper(callback); - } else { - log.error("[{}] Failed to open ledger {}: {}", name, id, BKException.getMessage(rc)); - callback.initializeFailed(createManagedLedgerException(rc)); - return; - } - })); - }; - - if (log.isDebugEnabled()) { - log.debug("[{}] Opening ledger {}", name, id); + mbean.startDataLedgerOpenOp(); + bookKeeper.asyncOpenLedger(id, digestType, config.getPassword(), opencb, null); + } else { + initializeBookKeeper(callback); + } } - mbean.startDataLedgerOpenOp(); - bookKeeper.asyncOpenLedger(id, digestType, config.getPassword(), opencb, null); - } else { - initializeBookKeeper(callback); - } - } - @Override - public void operationFailed(MetaStoreException e) { - if (e instanceof MetadataNotFoundException) { - callback.initializeFailed(new ManagedLedgerNotFoundException(e)); - } else { - callback.initializeFailed(new ManagedLedgerException(e)); - } - } - }); + @Override + public void operationFailed(MetaStoreException e) { + if (e instanceof MetadataNotFoundException) { + callback.initializeFailed(new ManagedLedgerNotFoundException(e)); + } else { + callback.initializeFailed(new ManagedLedgerException(e)); + } + } + }); scheduleTimeoutTask(); + } private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) { @@ -456,12 +469,22 @@ private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedg }, null); } } - if (state == State.Terminated) { // When recovering a terminated managed ledger, we don't need to create // a new ledger for writing, since no more writes are allowed. // We just move on to the next stage - initializeCursors(callback); + List> futures = new ArrayList<>(2); + futures.add(initializeCursors()); + futures.add(managedTrash.initialize()); + + FutureUtil.waitForAll(futures).whenComplete((res, e) -> { + if (e != null) { + callback.initializeFailed((ManagedLedgerException) e); + return; + } + callback.initializeComplete(); + }); + return; } @@ -469,7 +492,17 @@ private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedg @Override public void operationComplete(Void v, Stat stat) { ledgersStat = stat; - initializeCursors(callback); + List> futures = new ArrayList<>(2); + futures.add(initializeCursors()); + futures.add(managedTrash.initialize()); + + FutureUtil.waitForAll(futures).whenComplete((res, e) -> { + if (e != null) { + callback.initializeFailed((ManagedLedgerException) e); + return; + } + callback.initializeComplete(); + }); } @Override @@ -521,7 +554,8 @@ public void operationFailed(MetaStoreException e) { }, ledgerMetadata); } - private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) { + private CompletableFuture initializeCursors() { + CompletableFuture future = new CompletableFuture<>(); if (log.isDebugEnabled()) { log.debug("[{}] initializing cursors", name); } @@ -535,7 +569,7 @@ public void operationComplete(List consumers, Stat s) { } if (consumers.isEmpty()) { - callback.initializeComplete(); + future.complete(null); return; } @@ -557,7 +591,7 @@ public void operationComplete() { if (cursorCount.decrementAndGet() == 0) { // The initialization is now completed, register the jmx mbean - callback.initializeComplete(); + future.complete(null); } } @@ -565,7 +599,7 @@ public void operationComplete() { public void operationFailed(ManagedLedgerException exception) { log.warn("[{}] Recovery for cursor {} failed", name, cursorName, exception); cursorCount.set(-1); - callback.initializeFailed(exception); + future.completeExceptionally(exception); } }); } @@ -602,16 +636,17 @@ public void operationFailed(ManagedLedgerException exception) { }); } // Complete ledger recovery. - callback.initializeComplete(); + future.complete(null); } } @Override public void operationFailed(MetaStoreException e) { log.warn("[{}] Failed to get the cursors list", name, e); - callback.initializeFailed(new ManagedLedgerException(e)); + future.completeExceptionally(new ManagedLedgerException(e)); } }); + return future; } @Override @@ -1405,38 +1440,49 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c LedgerHandle lh = currentLedger; - if (lh == null) { - // No ledger to close, proceed with next step - closeAllCursors(callback, ctx); - return; - } + List> futures = new ArrayList<>(); + futures.add(managedTrash.asyncClose()); - if (log.isDebugEnabled()) { - log.debug("[{}] Closing current writing ledger {}", name, lh.getId()); - } + CompletableFuture closeCursorFuture = new CompletableFuture<>(); + futures.add(closeCursorFuture); - mbean.startDataLedgerCloseOp(); - lh.asyncClose((rc, lh1, ctx1) -> { + if (lh == null) { + // No ledger to close, proceed with next step + closeAllCursors(closeCursorFuture); + } else { if (log.isDebugEnabled()) { - log.debug("[{}] Close complete for ledger {}: rc = {}", name, lh.getId(), rc); - } - mbean.endDataLedgerCloseOp(); - if (rc != BKException.Code.OK) { - callback.closeFailed(createManagedLedgerException(rc), ctx); - return; + log.debug("[{}] Closing current writing ledger {}", name, lh.getId()); } - ledgerCache.forEach((ledgerId, readHandle) -> { - invalidateReadHandle(ledgerId); - }); + mbean.startDataLedgerCloseOp(); + lh.asyncClose((rc, lh1, ctx1) -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Close complete for ledger {}: rc = {}", name, lh.getId(), rc); + } + mbean.endDataLedgerCloseOp(); + if (rc != BKException.Code.OK) { + closeCursorFuture.completeExceptionally(createManagedLedgerException(rc)); + return; + } - closeAllCursors(callback, ctx); - }, null); + ledgerCache.forEach((ledgerId, readHandle) -> { + invalidateReadHandle(ledgerId); + }); + closeAllCursors(closeCursorFuture); + }, null); + } + + FutureUtil.waitForAll(futures).thenAccept(ignore -> { + callback.closeComplete(ctx); + }).exceptionally(e -> { + callback.closeFailed(ManagedLedgerException.getManagedLedgerException(e), ctx); + return null; + }); } - private void closeAllCursors(CloseCallback callback, final Object ctx) { + private void closeAllCursors(CompletableFuture future) { // Close all cursors in parallel List> futures = Lists.newArrayList(); for (ManagedCursor cursor : cursors) { @@ -1446,11 +1492,12 @@ private void closeAllCursors(CloseCallback callback, final Object ctx) { } Futures.waitForAll(futures) - .thenRun(() -> callback.closeComplete(ctx)) + .thenRun(() -> future.complete(null)) .exceptionally(exception -> { - callback.closeFailed(ManagedLedgerException.getManagedLedgerException(exception.getCause()), ctx); - return null; - }); + future.completeExceptionally( + ManagedLedgerException.getManagedLedgerException(exception.getCause())); + return null; + }); } // ////////////////////////////////////////////////////////////////////// @@ -2488,7 +2535,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { break; } // if truncate, all ledgers besides currentLedger are going to be deleted - if (isTruncate){ + if (isTruncate) { if (log.isDebugEnabled()) { log.debug("[{}] Ledger {} will be truncated with ts {}", name, ls.getLedgerId(), ls.getTimestamp()); @@ -2549,70 +2596,112 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { advanceCursorsIfNecessary(ledgersToDelete); PositionImpl currentLastConfirmedEntry = lastConfirmedEntry; - // Update metadata - for (LedgerInfo ls : ledgersToDelete) { - if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) { - // this info is relevant because the lastMessageId won't be available anymore - log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be " - + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry); - } - invalidateReadHandle(ls.getLedgerId()); - - ledgers.remove(ls.getLedgerId()); - NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries()); - TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize()); + // Update metadata + // Mark deletable ledgers + Set deletableLedgers = + Stream.concat(ledgersToDelete.stream().filter(ls -> !ls.getOffloadContext().getBookkeeperDeleted()), + offloadedLedgersToDelete.stream()).map(LedgerInfo::getLedgerId).collect(Collectors.toSet()); + + // Mark deletable offloaded ledgers + Set deletableOffloadedLedgers = ledgersToDelete.stream() + .filter(ls -> ls.getOffloadContext().hasUidMsb()) + .map(LedgerInfo::getLedgerId).collect(Collectors.toSet()); + + CompletableFuture updateTrashFuture = + asyncUpdateTrashData(deletableLedgers, deletableOffloadedLedgers); + updateTrashFuture.thenAccept(ignore -> { + for (LedgerInfo ls : ledgersToDelete) { + if (currentLastConfirmedEntry != null + && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) { + // this info is relevant because the lastMessageId won't be available anymore + log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be " + + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry); + } - entryCache.invalidateAllEntries(ls.getLedgerId()); - } - for (LedgerInfo ls : offloadedLedgersToDelete) { - LedgerInfo.Builder newInfoBuilder = ls.toBuilder(); - newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true); - String driverName = OffloadUtils.getOffloadDriverName(ls, - config.getLedgerOffloader().getOffloadDriverName()); - Map driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls, - config.getLedgerOffloader().getOffloadDriverMetadata()); - OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata); - ledgers.put(ls.getLedgerId(), newInfoBuilder.build()); - } + invalidateReadHandle(ls.getLedgerId()); + ledgers.remove(ls.getLedgerId()); + entryCache.invalidateAllEntries(ls.getLedgerId()); - if (log.isDebugEnabled()) { - log.debug("[{}] Updating of ledgers list after trimming", name); - } + NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries()); + TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize()); + } + for (LedgerInfo ls : offloadedLedgersToDelete) { + LedgerInfo.Builder newInfoBuilder = ls.toBuilder(); + newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true); + String driverName = OffloadUtils.getOffloadDriverName(ls, + config.getLedgerOffloader().getOffloadDriverName()); + Map driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls, + config.getLedgerOffloader().getOffloadDriverMetadata()); + OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata); + ledgers.put(ls.getLedgerId(), newInfoBuilder.build()); + } - store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback() { - @Override - public void operationComplete(Void result, Stat stat) { - log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(), - TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this)); - ledgersStat = stat; - metadataMutex.unlock(); - trimmerMutex.unlock(); + if (log.isDebugEnabled()) { + log.debug("[{}] Updating of ledgers list after trimming", name); + } - for (LedgerInfo ls : ledgersToDelete) { - log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize()); - asyncDeleteLedger(ls.getLedgerId(), ls); - } - for (LedgerInfo ls : offloadedLedgersToDelete) { - log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(), - ls.getSize()); - asyncDeleteLedgerFromBookKeeper(ls.getLedgerId()); + store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback() { + @Override + public void operationComplete(Void result, Stat stat) { + log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(), + TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this)); + ledgersStat = stat; + metadataMutex.unlock(); + trimmerMutex.unlock(); + if (managedTrash instanceof ManagedTrashDisableImpl) { + for (LedgerInfo ls : ledgersToDelete) { + log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize()); + asyncDeleteLedger(ls.getLedgerId(), ls); + } + for (LedgerInfo ls : offloadedLedgersToDelete) { + log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, + ls.getLedgerId(), + ls.getSize()); + asyncDeleteLedgerFromBookKeeper(ls.getLedgerId()); + } + } else { + managedTrash.triggerDeleteInBackground(); + } + promise.complete(null); } - promise.complete(null); - } - @Override - public void operationFailed(MetaStoreException e) { - log.warn("[{}] Failed to update the list of ledgers after trimming", name, e); - metadataMutex.unlock(); - trimmerMutex.unlock(); + @Override + public void operationFailed(MetaStoreException e) { + log.warn("[{}] Failed to update the list of ledgers after trimming", name, e); + metadataMutex.unlock(); + trimmerMutex.unlock(); - promise.completeExceptionally(e); - } + promise.completeExceptionally(e); + } + }); + }).exceptionally(ex -> { + metadataMutex.unlock(); + trimmerMutex.unlock(); + promise.completeExceptionally(ex); + return null; }); } } + private CompletableFuture asyncUpdateTrashData(Collection deletableLedgerIds, + Collection deletableOffloadedLedgerIds) { + CompletableFuture future = new CompletableFuture<>(); + try { + for (Long ledgerId : deletableLedgerIds) { + managedTrash.appendLedgerTrashData(ledgerId, null, ManagedTrash.LedgerType.LEDGER); + } + for (Long ledgerId : deletableOffloadedLedgerIds) { + managedTrash.appendLedgerTrashData(ledgerId, ledgers.get(ledgerId), + ManagedTrash.LedgerType.OFFLOAD_LEDGER); + } + future.complete(null); + } catch (ManagedLedgerException e) { + future.completeExceptionally(e); + } + return future.thenCompose(ignore -> managedTrash.asyncUpdateTrashData()); + } + /** * Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data. * This method also addresses a corner case for durable cursors in which the cursor is caught up, i.e. the mark @@ -2696,6 +2785,7 @@ public void deleteLedgerFailed(ManagedLedgerException e, Object ctx) { public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) { // Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and // ledgers + STATE_UPDATER.set(this, State.Fenced); cancelScheduledTasks(); @@ -2786,61 +2876,99 @@ private void asyncDeleteLedger(long ledgerId, long retry) { @SuppressWarnings("checkstyle:fallthrough") private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) { List ledgers = Lists.newArrayList(ManagedLedgerImpl.this.ledgers.values()); - AtomicInteger ledgersToDelete = new AtomicInteger(ledgers.size()); if (ledgers.isEmpty()) { // No ledgers to delete, proceed with deleting metadata - deleteMetadata(callback, ctx); + deleteMetadata().whenComplete((res, e) -> { + if (e != null) { + callback.deleteLedgerFailed((ManagedLedgerException) e, ctx); + return; + } + callback.deleteLedgerComplete(ctx); + }); return; } - - for (LedgerInfo ls : ledgers) { - if (log.isDebugEnabled()) { - log.debug("[{}] Deleting ledger {}", name, ls); - } - bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx1) -> { - switch (rc) { - case Code.NoSuchLedgerExistsException: - case Code.NoSuchLedgerExistsOnMetadataServerException: - log.warn("[{}] Ledger {} not found when deleting it", name, ls.getLedgerId()); - // Continue anyway - - case BKException.Code.OK: - if (ledgersToDelete.decrementAndGet() == 0) { - // All ledgers deleted, now remove ML metadata - deleteMetadata(callback, ctx); - } - break; - - default: - // Handle error - log.warn("[{}] Failed to delete ledger {} -- {}", name, ls.getLedgerId(), - BKException.getMessage(rc)); - int toDelete = ledgersToDelete.get(); - if (toDelete != -1 && ledgersToDelete.compareAndSet(toDelete, -1)) { - // Trigger callback only once - callback.deleteLedgerFailed(createManagedLedgerException(rc), ctx); + if (managedTrash instanceof ManagedTrashDisableImpl) { + AtomicInteger ledgersToDelete = new AtomicInteger(ledgers.size()); + for (LedgerInfo ls : ledgers) { + if (log.isDebugEnabled()) { + log.debug("[{}] Deleting ledger {}", name, ls); + } + bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx1) -> { + switch (rc) { + case Code.NoSuchLedgerExistsException: + case Code.NoSuchLedgerExistsOnMetadataServerException: + log.warn("[{}] Ledger {} not found when deleting it", name, ls.getLedgerId()); + // Continue anyway + + case BKException.Code.OK: + if (ledgersToDelete.decrementAndGet() == 0) { + // All ledgers deleted, now remove ML metadata + deleteMetadata().whenComplete((res, e) -> { + if (e != null) { + callback.deleteLedgerFailed((ManagedLedgerException) e, ctx); + return; + } + callback.deleteLedgerComplete(ctx); + }); + } + break; + + default: + // Handle error + log.warn("[{}] Failed to delete ledger {} -- {}", name, ls.getLedgerId(), + BKException.getMessage(rc)); + int toDelete = ledgersToDelete.get(); + if (toDelete != -1 && ledgersToDelete.compareAndSet(toDelete, -1)) { + // Trigger callback only once + callback.deleteLedgerFailed(createManagedLedgerException(rc), ctx); + } } + }, null); + } + } else { + // Mark deletable ledgers + Set deletableLedgers = ledgers.stream().filter(ls -> !ls.getOffloadContext().getBookkeeperDeleted()) + .map(LedgerInfo::getLedgerId).collect(Collectors.toSet()); + + // Mark deletable offloaded ledgers + Set deletableOffloadedLedgers = ledgers.stream() + .filter(ls -> ls.getOffloadContext().hasUidMsb()) + .map(LedgerInfo::getLedgerId).collect(Collectors.toSet()); + + asyncUpdateTrashData(deletableLedgers, deletableOffloadedLedgers).thenCompose(ignore -> { + managedTrash.triggerDeleteInBackground(); + return deleteMetadata(); + }).thenAccept(ignore -> managedTrash.asyncCloseAfterAllLedgerDeleteOnce().whenComplete((res, e) -> { + if (e != null) { + log.error("[{}] ManagedTrash all trash data delete once exception.", name, e); } - }, null); + callback.deleteLedgerComplete(ctx); + })).exceptionally(e -> { + callback.deleteLedgerFailed(createManagedLedgerException(e), ctx); + return null; + }); } } - private void deleteMetadata(DeleteLedgerCallback callback, Object ctx) { + private CompletableFuture deleteMetadata() { + CompletableFuture future = new CompletableFuture<>(); store.removeManagedLedger(name, new MetaStoreCallback() { @Override public void operationComplete(Void result, Stat stat) { log.info("[{}] Successfully deleted managed ledger", name); factory.close(ManagedLedgerImpl.this); - callback.deleteLedgerComplete(ctx); + ledgers.clear(); + future.complete(null); } @Override public void operationFailed(MetaStoreException e) { log.warn("[{}] Failed to delete managed ledger", name, e); factory.close(ManagedLedgerImpl.this); - callback.deleteLedgerFailed(e, ctx); + future.completeExceptionally(e); } }); + return future; } @Override @@ -2980,26 +3108,37 @@ private void offloadLoop(CompletableFuture promise, Queue driverMetadata = config.getLedgerOffloader().getOffloadDriverMetadata(); prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata) - .thenCompose((ignore) -> getLedgerHandle(ledgerId)) - .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata)) - .thenCompose((ignore) -> { + .thenCompose((ignore) -> getLedgerHandle(ledgerId)) + .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata)) + .thenCompose((ignore) -> { return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), - TimeUnit.SECONDS.toHours(1)).limit(10), - FAIL_ON_CONFLICT, - () -> completeLedgerInfoForOffloaded(ledgerId, uuid), - scheduledExecutor, name) - .whenComplete((ignore2, exception) -> { + TimeUnit.SECONDS.toHours(1)).limit(10), + FAIL_ON_CONFLICT, + () -> completeLedgerInfoForOffloaded(ledgerId, uuid), + scheduledExecutor, name) + .whenComplete((ignore2, exception) -> { if (exception != null) { log.error("[{}] Failed to offload data for the ledgerId {}", name, ledgerId, exception); - cleanupOffloaded( - ledgerId, uuid, - driverName, driverMetadata, - "Metastore failure"); + //not delete offload ledger, it will delete at next offload. + if (managedTrash instanceof ManagedTrashDisableImpl) { + cleanupOffloaded(ledgerId, uuid, driverName, driverMetadata, + "Metastore failure"); + } else { + try { + managedTrash.appendLedgerTrashData(ledgerId, ledgers.get(ledgerId), + ManagedTrash.LedgerType.OFFLOAD_LEDGER); + managedTrash.asyncUpdateTrashData().thenAccept(ignore1 -> { + managedTrash.triggerDeleteInBackground(); + }); + } catch (ManagedLedgerException e) { + log.warn("[{}]-{} Failed to append trash data.", this.name, ledgerId); + } + } } }); }) - .whenComplete((ignore, exception) -> { + .whenComplete((ignore, exception) -> { if (exception != null) { lastOffloadFailureTimestamp = System.currentTimeMillis(); log.warn("[{}] Exception occurred for ledgerId {} timestamp {} during offload", name, @@ -3017,8 +3156,8 @@ private void offloadLoop(CompletableFuture promise, Queue prepareLedgerInfoForOffloaded(long ledgerId, UUID uuid, String offloadDriverName, - Map offloadDriverMetadata) { + Map offloadDriverMetadata) { log.info("[{}] Preparing metadata to offload ledger {} with uuid {}", name, ledgerId, uuid); return transformLedgerInfo(ledgerId, - (oldInfo) -> { - if (oldInfo.getOffloadContext().hasUidMsb()) { - UUID oldUuid = new UUID(oldInfo.getOffloadContext().getUidMsb(), - oldInfo.getOffloadContext().getUidLsb()); - log.info("[{}] Found previous offload attempt for ledger {}, uuid {}" - + ", cleaning up", name, ledgerId, uuid); - cleanupOffloaded( - ledgerId, - oldUuid, - OffloadUtils.getOffloadDriverName(oldInfo, - config.getLedgerOffloader().getOffloadDriverName()), - OffloadUtils.getOffloadDriverMetadata(oldInfo, - config.getLedgerOffloader().getOffloadDriverMetadata()), - "Previous failed offload"); - } - LedgerInfo.Builder builder = oldInfo.toBuilder(); - builder.getOffloadContextBuilder() - .setUidMsb(uuid.getMostSignificantBits()) - .setUidLsb(uuid.getLeastSignificantBits()); - OffloadUtils.setOffloadDriverMetadata( - builder, - offloadDriverName, - offloadDriverMetadata - ); - return builder.build(); - }) - .whenComplete((result, exception) -> { + (oldInfo) -> { + if (oldInfo.getOffloadContext().hasUidMsb()) { + UUID oldUuid = new UUID(oldInfo.getOffloadContext().getUidMsb(), + oldInfo.getOffloadContext().getUidLsb()); + log.info("[{}] Found previous offload attempt for ledger {}, uuid {} old uuid {}" + + ", cleaning up", name, ledgerId, uuid, oldUuid); + if (managedTrash instanceof ManagedTrashDisableImpl) { + cleanupOffloaded( + ledgerId, + oldUuid, + OffloadUtils.getOffloadDriverName(oldInfo, + config.getLedgerOffloader().getOffloadDriverName()), + OffloadUtils.getOffloadDriverMetadata(oldInfo, + config.getLedgerOffloader().getOffloadDriverMetadata()), + "Previous failed offload"); + } else { + managedTrash.appendLedgerTrashData(ledgerId, oldInfo, + ManagedTrash.LedgerType.OFFLOAD_LEDGER); + managedTrash.asyncUpdateTrashData().thenAccept(ignore -> { + managedTrash.triggerDeleteInBackground(); + }); + } + } + LedgerInfo.Builder builder = oldInfo.toBuilder(); + builder.getOffloadContextBuilder() + .setUidMsb(uuid.getMostSignificantBits()) + .setUidLsb(uuid.getLeastSignificantBits()); + OffloadUtils.setOffloadDriverMetadata( + builder, + offloadDriverName, + offloadDriverMetadata + ); + return builder.build(); + }) + .whenComplete((result, exception) -> { if (exception != null) { log.warn("[{}] Failed to prepare ledger {} for offload, uuid {}", - name, ledgerId, uuid, exception); + name, ledgerId, uuid, exception); } else { log.info("[{}] Metadata prepared for offload of ledger {} with uuid {}", name, ledgerId, uuid); } @@ -3146,38 +3293,38 @@ private CompletableFuture prepareLedgerInfoForOffloaded(long ledgerId, UUI private CompletableFuture completeLedgerInfoForOffloaded(long ledgerId, UUID uuid) { log.info("[{}] Completing metadata for offload of ledger {} with uuid {}", name, ledgerId, uuid); return transformLedgerInfo(ledgerId, - (oldInfo) -> { - UUID existingUuid = new UUID(oldInfo.getOffloadContext().getUidMsb(), - oldInfo.getOffloadContext().getUidLsb()); - if (existingUuid.equals(uuid)) { - LedgerInfo.Builder builder = oldInfo.toBuilder(); - builder.getOffloadContextBuilder() - .setTimestamp(clock.millis()) - .setComplete(true); - - String driverName = OffloadUtils.getOffloadDriverName( - oldInfo, config.getLedgerOffloader().getOffloadDriverName()); - Map driverMetadata = OffloadUtils.getOffloadDriverMetadata( - oldInfo, config.getLedgerOffloader().getOffloadDriverMetadata()); - OffloadUtils.setOffloadDriverMetadata( - builder, - driverName, - driverMetadata - ); - return builder.build(); - } else { - throw new OffloadConflict( - "Existing UUID(" + existingUuid + ") in metadata for offload" - + " of ledgerId " + ledgerId + " does not match the UUID(" + uuid - + ") for the offload we are trying to complete"); - } - }) - .whenComplete((result, exception) -> { + (oldInfo) -> { + UUID existingUuid = new UUID(oldInfo.getOffloadContext().getUidMsb(), + oldInfo.getOffloadContext().getUidLsb()); + if (existingUuid.equals(uuid)) { + LedgerInfo.Builder builder = oldInfo.toBuilder(); + builder.getOffloadContextBuilder() + .setTimestamp(clock.millis()) + .setComplete(true); + + String driverName = OffloadUtils.getOffloadDriverName( + oldInfo, config.getLedgerOffloader().getOffloadDriverName()); + Map driverMetadata = OffloadUtils.getOffloadDriverMetadata( + oldInfo, config.getLedgerOffloader().getOffloadDriverMetadata()); + OffloadUtils.setOffloadDriverMetadata( + builder, + driverName, + driverMetadata + ); + return builder.build(); + } else { + throw new OffloadConflict( + "Existing UUID(" + existingUuid + ") in metadata for offload" + + " of ledgerId " + ledgerId + " does not match the UUID(" + uuid + + ") for the offload we are trying to complete"); + } + }) + .whenComplete((result, exception) -> { if (exception == null) { log.info("[{}] End Offload. ledger={}, uuid={}", name, ledgerId, uuid); } else { log.warn("[{}] Failed to complete offload of ledger {}, uuid {}", - name, ledgerId, uuid, exception); + name, ledgerId, uuid, exception); } }); } @@ -3795,7 +3942,18 @@ protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object c } else { if (rc == BKException.Code.OK) { log.warn("[{}]-{} ledger creation timed-out, deleting ledger", this.name, lh.getId()); - asyncDeleteLedger(lh.getId(), DEFAULT_LEDGER_DELETE_RETRIES); + if (managedTrash instanceof ManagedTrashDisableImpl) { + asyncDeleteLedger(lh.getId(), DEFAULT_LEDGER_DELETE_RETRIES); + } else { + try { + managedTrash.appendLedgerTrashData(lh.getId(), null, ManagedTrash.LedgerType.LEDGER); + managedTrash.asyncUpdateTrashData().thenAccept(ignore -> { + managedTrash.triggerDeleteInBackground(); + }); + } catch (ManagedLedgerException e) { + log.warn("[{}]-{} Failed to append trash data.", this.name, lh.getId()); + } + } } return true; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedTrashDisableImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedTrashDisableImpl.java new file mode 100644 index 0000000000000..734a88e888c3a --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedTrashDisableImpl.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedTrash; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; + +public class ManagedTrashDisableImpl implements ManagedTrash { + + private static final CompletableFuture COMPLETABLE_FUTURE = CompletableFuture.completedFuture(null); + + @Override + public String name() { + return ""; + } + + @Override + public CompletableFuture initialize() { + return COMPLETABLE_FUTURE; + } + + @Override + public void appendLedgerTrashData(long ledgerId, LedgerInfo context, LedgerType type) + throws ManagedLedgerException { + } + + @Override + public CompletableFuture asyncUpdateTrashData() { + return COMPLETABLE_FUTURE; + } + + @Override + public void triggerDeleteInBackground() { + } + + @Override + public CompletableFuture> getAllArchiveIndex() { + return (CompletableFuture>) COMPLETABLE_FUTURE; + } + + @Override + public CompletableFuture> getArchiveData(long index) { + return (CompletableFuture>) COMPLETABLE_FUTURE; + } + + @Override + public CompletableFuture asyncClose() { + return COMPLETABLE_FUTURE; + } + + @Override + public CompletableFuture asyncCloseAfterAllLedgerDeleteOnce() { + return COMPLETABLE_FUTURE; + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedTrashImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedTrashImpl.java new file mode 100644 index 0000000000000..55e835378917b --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedTrashImpl.java @@ -0,0 +1,892 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; +import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.common.util.OrderedExecutor; +import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedTrash; +import org.apache.bookkeeper.mledger.ManagedTrashMXBean; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.TrashDataComponent; +import org.apache.bookkeeper.mledger.util.CallbackMutex; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ManagedTrashImpl implements ManagedTrash { + + private static final Logger log = LoggerFactory.getLogger(ManagedTrashImpl.class); + + private static final String BASE_NODE = "/managed-trash"; + + private static final String PREFIX = BASE_NODE + "/"; + + private static final String DELETE_SUFFIX = "/delete"; + + private static final String ARCHIVE = "archive-"; + + private static final String ARCHIVE_SUFFIX = "/" + ARCHIVE; + + private static final String TRASH_KEY_SEPARATOR = ";"; + + private static final long EMPTY_LEDGER_ID = -1L; + + private static final LedgerInfo EMPTY_LEDGER_INFO = LedgerInfo.newBuilder().setLedgerId(EMPTY_LEDGER_ID).build(); + + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(ManagedTrashImpl.class, ManagedTrashImpl.State.class, "state"); + + protected volatile ManagedTrashImpl.State state = null; + + private NavigableMap trashData = new ConcurrentSkipListMap<>(); + + //todo 未达到 archiveLimit 的 trashData 中 leftRetryCount == 0 的数据是否需要单独一个节点维护数据 + + private final AtomicInteger toArchiveCount = new AtomicInteger(); + + private final CallbackMutex deleteMutex = new CallbackMutex(); + + private final CallbackMutex trashMutex = new CallbackMutex(); + + private final CallbackMutex archiveMutex = new CallbackMutex(); + + private final MetadataStore metadataStore; + + private volatile Stat deleteStat; + + private final AtomicInteger continueDeleteImmediately = new AtomicInteger(); + + private final String type; + + private final String name; + + private final ManagedLedgerConfig config; + + private final OrderedScheduler scheduledExecutor; + + private final OrderedExecutor executor; + + private final BookKeeper bookKeeper; + + private final int archiveDataLimitSize; + + private final long retryDeleteIntervalMillis; + + private final long nextDeleteDelayMillis; + + private final int maxDeleteCount; + + private volatile boolean trashIsDirty; + + private ScheduledFuture checkTrashPersistTask; + + private final ManagedTrashMXBean managedTrashMXBean; + + private final NavigableMap managedLedgers; + + public ManagedTrashImpl(ManagedType type, String name, MetadataStore metadataStore, ManagedLedgerConfig config, + OrderedScheduler scheduledExecutor, OrderedExecutor executor, BookKeeper bookKeeper, + NavigableMap managedLedgers) { + this.type = type.getName(); + this.name = name; + this.config = config; + this.metadataStore = metadataStore; + this.managedLedgers = managedLedgers; + this.scheduledExecutor = scheduledExecutor; + this.executor = executor; + this.bookKeeper = bookKeeper; + this.archiveDataLimitSize = config.getArchiveDataLimitSize(); + this.retryDeleteIntervalMillis = TimeUnit.SECONDS.toMillis(config.getRetryDeleteIntervalSeconds()); + this.nextDeleteDelayMillis = calculateNextDeleteDelayMillis(this.retryDeleteIntervalMillis); + this.maxDeleteCount = Integer.max(1, config.getMaxDeleteCount()); + this.managedTrashMXBean = new ManagedTrashMXBeanImpl(this); + STATE_UPDATER.set(this, State.None); + } + + protected long calculateNextDeleteDelayMillis(long retryDeleteIntervalMillis) { + return retryDeleteIntervalMillis / 5; + } + + @Override + public String name() { + return name + "-" + type; + } + + @Override + public CompletableFuture initialize() { + CompletableFuture future = new CompletableFuture<>(); + metadataStore.get(buildDeletePath()).whenCompleteAsync((res, e) -> { + if (e != null) { + log.error("[{}] Get delete data failed.", name(), e); + future.completeExceptionally(ManagedLedgerException.getManagedLedgerException(e)); + return; + } + if (res.isEmpty()) { + STATE_UPDATER.set(this, State.Initialized); + checkTrashPersistTask = + scheduledExecutor.scheduleAtFixedRate(safeRun(this::persistTrashIfNecessary), 30L, 30L, + TimeUnit.MINUTES); + future.complete(null); + return; + } + byte[] value = res.get().getValue(); + try { + trashData.putAll(deSerialize(value)); + deleteStat = res.get().getStat(); + toArchiveCount.set(calculateArchiveCount()); + future.complete(null); + checkTrashPersistTask = + scheduledExecutor.scheduleAtFixedRate(safeRun(this::persistTrashIfNecessary), 30L, 30L, + TimeUnit.MINUTES); + STATE_UPDATER.set(this, State.Initialized); + triggerDeleteInBackground(); + } catch (InvalidProtocolBufferException exc) { + future.completeExceptionally(ManagedLedgerException.getManagedLedgerException(exc)); + } + }, executor.chooseThread(name)); + return future; + } + + private void persistTrashIfNecessary() { + if (trashIsDirty) { + asyncUpdateTrashData(); + } + } + + private int calculateArchiveCount() { + int toArchiveCount = 0; + for (TrashKey key : trashData.keySet()) { + if (key.retryCount != 0) { + break; + } + toArchiveCount++; + } + return toArchiveCount; + } + + @Override + public void appendLedgerTrashData(long ledgerId, LedgerInfo context, LedgerType type) + throws ManagedLedgerException { + State state = STATE_UPDATER.get(this); + if (State.FENCED == state || State.Closed == state) { + throw ManagedLedgerException.getManagedLedgerException(new IllegalStateException( + String.format("[%s] is not initialized, current state: %s", name(), state))); + } + if (context == null) { + context = EMPTY_LEDGER_INFO; + } + TrashKey key = null; + if (ManagedTrash.LedgerType.LEDGER.equals(type)) { + key = TrashKey.buildKey(maxDeleteCount, ledgerId, 0L, type); + } else if (ManagedTrash.LedgerType.OFFLOAD_LEDGER.equals(type)) { + key = TrashKey.buildKey(maxDeleteCount, ledgerId, context.getOffloadContext().getUidMsb(), type); + } + trashData.put(key, context); + managedTrashMXBean.increaseTotalNumberOfDeleteLedgers(); + trashIsDirty = true; + + } + + @Override + public CompletableFuture asyncUpdateTrashData() { + log.info("{} Start async update trash data", name()); + CompletableFuture future = new CompletableFuture<>(); + doAsyncUpdateTrashData(future); + return future; + } + + public void asyncUpdateTrashDataInBackground(CompletableFuture future) { + executor.executeOrdered(name, safeRun(() -> doAsyncUpdateTrashData(future))); + } + + private void doAsyncUpdateTrashData(CompletableFuture future) { + State state = STATE_UPDATER.get(this); + if (State.Closed == state) { + future.completeExceptionally(ManagedLedgerException.getManagedLedgerException( + new IllegalStateException(String.format("[%s] is closed.", name())))); + return; + } + if (!trashMutex.tryLock()) { + scheduledExecutor.schedule(() -> asyncUpdateTrashDataInBackground(future), 100, TimeUnit.MILLISECONDS); + return; + } + metadataStore.put(buildDeletePath(), serialize(trashData), + deleteStat == null ? Optional.of(-1L) : Optional.of(deleteStat.getVersion())) + .whenCompleteAsync((res, e) -> { + if (e != null) { + future.completeExceptionally(getMetaStoreException(e)); + trashMutex.unlock(); + return; + } + deleteStat = res; + trashIsDirty = false; + future.complete(null); + trashMutex.unlock(); + }, executor.chooseThread(name)); + } + + + public static byte[] serialize(Map toPersist) { + Map transfer = transferTo(toPersist); + TrashDataComponent.Builder builder = TrashDataComponent.newBuilder(); + for (Map.Entry entry : transfer.entrySet()) { + MLDataFormats.TrashData.Builder innerBuilder = MLDataFormats.TrashData.newBuilder().setKey(entry.getKey()); + if (entry.getValue().getLedgerId() != EMPTY_LEDGER_ID) { + innerBuilder.setValue(entry.getValue()); + } + builder.addComponent(innerBuilder.build()); + } + return builder.build().toByteArray(); + } + + private static Map transferTo(Map to) { + Map result = new ConcurrentSkipListMap<>(); + for (Map.Entry entry : to.entrySet()) { + result.put(entry.getKey().toStringKey(), entry.getValue()); + } + return result; + } + + public static NavigableMap deSerialize(byte[] content) throws InvalidProtocolBufferException { + TrashDataComponent component = TrashDataComponent.parseFrom(content); + List componentList = component.getComponentList(); + Map result = new ConcurrentSkipListMap<>(); + for (MLDataFormats.TrashData ele : componentList) { + if (ele.hasValue()) { + result.put(ele.getKey(), ele.getValue()); + } else { + result.put(ele.getKey(), EMPTY_LEDGER_INFO); + } + } + return transferFrom(result); + } + + + private static NavigableMap transferFrom(Map from) { + NavigableMap result = new ConcurrentSkipListMap<>(); + for (Map.Entry entry : from.entrySet()) { + result.put(TrashKey.buildKey(entry.getKey()), entry.getValue()); + } + return result; + } + + @Override + public void triggerDeleteInBackground() { + State state = STATE_UPDATER.get(this); + LedgerType type; + //if state fenced, means that it only delete ledgers + if (State.FENCED == state) { + type = LedgerType.LEDGER; + } else { + type = LedgerType.BOTH; + } + executor.executeOrdered(name, safeRun(() -> triggerDelete(type))); + } + + + private void triggerDelete(LedgerType type) { + State state = STATE_UPDATER.get(this); + if (State.Closed == state) { + log.warn("[{}] is closed", name()); + return; + } + if (!deleteMutex.tryLock()) { + continueDeleteImmediately.incrementAndGet(); + return; + } + Tuple tuple = getToDeleteData(type); + if (tuple.toDelete.size() == 0) { + deleteMutex.unlock(); + //if filtered, means that there still + if (tuple.filtered) { + continueDeleteIfNecessary(); + } else { + continueDeleteImmediately.set(0); + } + return; + } + List> futures = new ArrayList<>(); + for (DelHelper delHelper : tuple.toDelete) { + futures.add(asyncDeleteTrash(delHelper)); + } + FutureUtil.waitForAll(futures).whenCompleteAsync((res, e) -> { + deleteMutex.unlock(); + continueDeleteIfNecessary(); + }); + } + + @Override + public CompletableFuture asyncCloseAfterAllLedgerDeleteOnce() { + //ensure can't add more trashData. + STATE_UPDATER.set(this, State.FENCED); + CompletableFuture future = new CompletableFuture<>(); + allTrashDataDeleteOnce(future); + return future.thenCompose(ignore -> { + CompletableFuture finalFuture = new CompletableFuture<>(); + asyncClose().whenComplete((res, e) -> { + if (e != null) { + finalFuture.completeExceptionally(e); + return; + } + finalFuture.complete(null); + }); + return finalFuture; + }); + } + + private void allTrashDataDeleteOnce(CompletableFuture future) { + if (trashData.isEmpty()) { + future.complete(null); + return; + } + Optional> lastLedgerTrashData = + trashData.descendingMap().entrySet().stream().filter(ele -> + LedgerType.OFFLOAD_LEDGER != ele.getKey().getType()).findFirst(); + if (lastLedgerTrashData.isEmpty()) { + future.complete(null); + return; + } + if (lastLedgerTrashData.get().getKey().getRetryCount() < maxDeleteCount) { + future.complete(null); + return; + } + triggerDeleteInBackground(); + scheduledExecutor.schedule(() -> allTrashDataDeleteOnceInBackground(future), 1000, + TimeUnit.MILLISECONDS); + } + + private void allTrashDataDeleteOnceInBackground(CompletableFuture future) { + executor.executeOrdered(name, safeRun(() -> allTrashDataDeleteOnce(future))); + } + + @Override + public CompletableFuture> getAllArchiveIndex() { + return metadataStore.getChildren(buildParentPath()).thenComposeAsync(children -> { + CompletableFuture> future = new CompletableFuture<>(); + if (CollectionUtils.isEmpty(children)) { + future.complete(Collections.emptyList()); + return future; + } + List archiveIndexes = new ArrayList<>(); + for (String ele : children) { + if (!ele.startsWith(ARCHIVE)) { + continue; + } + String indexStr = ele.split(ARCHIVE)[1]; + archiveIndexes.add(Long.parseLong(indexStr)); + } + future.complete(archiveIndexes); + return future; + }, executor.chooseThread(name)); + } + + @Override + public CompletableFuture> getArchiveData(final long index) { + return metadataStore.get(buildArchivePath(index)).thenComposeAsync(optResult -> { + CompletableFuture> future = new CompletableFuture<>(); + if (optResult.isPresent()) { + byte[] content = optResult.get().getValue(); + try { + Map result = deSerialize(content); + future.complete(result); + } catch (InvalidProtocolBufferException e) { + future.completeExceptionally(e); + } + } + return future; + }, executor.chooseThread(name)); + } + + public long getTrashDataSize() { + return trashData.size(); + } + + public long getToArchiveDataSize() { + return toArchiveCount.get(); + } + + @Override + public CompletableFuture asyncClose() { + CompletableFuture future = new CompletableFuture<>(); + if (State.Closed == STATE_UPDATER.get(this)) { + future.complete(null); + return future; + } + if (checkTrashPersistTask != null) { + checkTrashPersistTask.cancel(true); + checkTrashPersistTask = null; + } + asyncUpdateTrashData().whenComplete((res, e) -> { + if (e != null) { + future.completeExceptionally(e); + return; + } + future.complete(null); + }); + STATE_UPDATER.set(this, State.Closed); + return future; + } + + private CompletableFuture increaseArchiveCountWhenDeleteFailed() { + CompletableFuture future = new CompletableFuture<>(); + toArchiveCount.incrementAndGet(); + managedTrashMXBean.increaseTotalNumberOfArchiveLedgers(); + updateArchiveDataIfNecessary(future); + return future; + } + + private void updateArchiveDataIfNecessary(final CompletableFuture future) { + if (toArchiveCount.get() < archiveDataLimitSize) { + future.complete(null); + return; + } + asyncUpdateArchiveData(future); + } + + private String buildParentPath() { + return PREFIX + type + "/" + name; + } + + private String buildDeletePath() { + return buildParentPath() + DELETE_SUFFIX; + } + + private String buildArchivePath(long index) { + return buildParentPath() + ARCHIVE_SUFFIX + index; + } + + //take 1/10 trash to delete, if the size over 10, use 10 to delete. + protected Tuple getToDeleteData(LedgerType type) { + if (trashData.size() == 0) { + return new Tuple(Collections.emptyList(), false); + } + int batchSize = trashData.size() / 10; + if (batchSize > 10) { + batchSize = 10; + } + if (batchSize == 0) { + batchSize = 1; + } + boolean filtered = false; + List toDelete = new ArrayList<>(batchSize); + + for (Map.Entry entry : trashData.descendingMap().entrySet()) { + //if last retryCount is zero, the before data retryCount is zero too. + if (entry.getKey().retryCount == 0) { + break; + } + if (LedgerType.BOTH != type && type != entry.getKey().getType()) { + continue; + } + //1.filter trashData which last time and current time differ by no more than deleteIntervalMillis. + //2.filter trashData which still exist in managedLedgers + if (System.currentTimeMillis() - entry.getKey().lastDeleteTs < retryDeleteIntervalMillis + || managedLedgers.containsKey(entry.getKey().getLedgerId())) { + filtered = true; + continue; + } + + toDelete.add(DelHelper.buildHelper(entry.getKey(), entry.getValue())); + if (toDelete.size() == batchSize) { + break; + } + } + return new Tuple(toDelete, filtered); + } + + static class Tuple { + private List toDelete; + private boolean filtered; + + public Tuple(List toDelete, boolean filtered) { + this.toDelete = toDelete; + this.filtered = filtered; + } + + public List getToDelete() { + return toDelete; + } + + public boolean isFiltered() { + return filtered; + } + } + + private void asyncUpdateArchiveData(CompletableFuture future) { + if (!archiveMutex.tryLock()) { + future.complete(null); + return; + } + log.info("[{}] Start async update archive data", name()); + + State state = STATE_UPDATER.get(this); + if (State.Closed == state) { + future.completeExceptionally(ManagedLedgerException.getManagedLedgerException( + new IllegalStateException(String.format("[%s] is closed.", name())))); + return; + } + asyncUpdateTrashData().thenAccept(ignore -> { + NavigableMap persistArchive = new ConcurrentSkipListMap<>(); + //here we didn't lock trashData, so maybe the persistArchive is discontinuous. such as: 1,2,3,10,12... + for (Map.Entry entry : trashData.entrySet()) { + //in theory, the retryCount can't greater than 0. + if (entry.getKey().retryCount > 0 || persistArchive.size() >= archiveDataLimitSize) { + break; + } + persistArchive.put(entry.getKey(), entry.getValue()); + } + + metadataStore.put(buildArchivePath(System.currentTimeMillis()), serialize(persistArchive), + Optional.of(-1L)).whenCompleteAsync((res, e) -> { + if (e != null) { + log.error("[{}] Persist archive data failed.", name(), e); + future.completeExceptionally(getMetaStoreException(e)); + deleteMutex.unlock(); + return; + } + persistArchive.keySet().forEach(ele -> trashData.remove(ele)); + trashIsDirty = false; + for (int i = 0; i < persistArchive.size(); i++) { + toArchiveCount.decrementAndGet(); + } + asyncUpdateTrashData().whenComplete((res1, e1) -> { + if (e1 != null) { + future.completeExceptionally(getMetaStoreException(e1)); + archiveMutex.unlock(); + return; + } + future.complete(null); + archiveMutex.unlock(); + }); + }, executor.chooseThread(name)); + }).exceptionally(e -> { + log.error("[{}] Persist archive data failed.", name(), e); + future.completeExceptionally(getMetaStoreException(e)); + archiveMutex.unlock(); + return null; + }); + + + } + + private CompletableFuture asyncDeleteTrash(DelHelper delHelper) { + if (delHelper.key.isLedger()) { + CompletableFuture future = asyncDeleteLedger(delHelper.key.ledgerId); + future.whenCompleteAsync((res, e) -> { + if (e != null) { + onDeleteFailed(delHelper); + return; + } + onDeleteSuccess(delHelper); + }, executor.chooseThread(name)); + return future; + } else if (delHelper.key.isOffloadLedger()) { + CompletableFuture future = asyncDeleteOffloadedLedger(delHelper.key.ledgerId, delHelper.context); + future.whenCompleteAsync((res, e) -> { + if (e != null) { + onDeleteFailed(delHelper); + return; + } + onDeleteSuccess(delHelper); + }, executor.chooseThread(name)); + return future; + } + return CompletableFuture.completedFuture(null); + } + + private void onDeleteSuccess(DelHelper helper) { + if (log.isDebugEnabled()) { + String info = null; + if (helper.key.isLedger()) { + info = String.format("[%s] Delete ledger %s success.", name(), helper.key.ledgerId); + } else if (helper.key.isOffloadLedger()) { + info = String.format("[%s] Delete offload ledger %s success.", name(), helper.key.ledgerId); + } + log.debug(info); + } + trashData.remove(helper.key); + trashIsDirty = true; + } + + private void onDeleteFailed(DelHelper helper) { + //override old key + trashData.remove(helper.key); + + TrashKey newKey = TrashKey.buildKey(helper.key.retryCount - 1, helper.key.ledgerId, helper.key.msb, + helper.key.type); + newKey.markLastDeleteTs(); + trashData.put(newKey, helper.context); + trashIsDirty = true; + if (helper.key.retryCount - 1 == 0) { + if (log.isWarnEnabled()) { + String info = null; + if (helper.key.isLedger()) { + info = String.format("[%s] Delete ledger %d reach retry limit %d.", name(), helper.key.ledgerId, + maxDeleteCount); + } else if (helper.key.isOffloadLedger()) { + info = String.format("[%s] Delete offload ledger %d reach retry limit %d.", name(), + helper.key.ledgerId, maxDeleteCount); + } + log.warn(info); + } + increaseArchiveCountWhenDeleteFailed(); + } + } + + private void continueDeleteIfNecessary() { + Map.Entry lastEntry = trashData.lastEntry(); + if (trashData.isEmpty()) { + return; + } + if (lastEntry.getKey().retryCount > 0) { + if (continueDeleteImmediately.get() > 0) { + triggerDeleteInBackground(); + continueDeleteImmediately.decrementAndGet(); + } else { + scheduledExecutor.schedule(this::triggerDeleteInBackground, nextDeleteDelayMillis, + TimeUnit.MILLISECONDS); + } + } + } + + private CompletableFuture asyncDeleteLedger(long ledgerId) { + CompletableFuture future = new CompletableFuture<>(); + log.info("[{}] Start async delete ledger {}", name(), ledgerId); + bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> { + if (isNoSuchLedgerExistsException(rc)) { + log.warn("[{}] Ledger was already deleted {}", name(), ledgerId); + } else if (rc != BKException.Code.OK) { + log.error("[{}] Error delete ledger {} : {}", name(), ledgerId, BKException.getMessage(rc)); + future.completeExceptionally(ManagedLedgerImpl.createManagedLedgerException(rc)); + return; + } + if (log.isDebugEnabled()) { + log.debug("[{}] Deleted ledger {}", name(), ledgerId); + } + future.complete(null); + }, null); + return future; + } + + private CompletableFuture asyncDeleteOffloadedLedger(long ledgerId, LedgerInfo info) { + CompletableFuture future = new CompletableFuture<>(); + if (!info.getOffloadContext().hasUidMsb()) { + future.completeExceptionally(new IllegalArgumentException( + String.format("[%s] Failed delete offload for ledgerId %s, can't find offload context.", name(), + ledgerId))); + return future; + } + String cleanupReason = "Trash-Trimming"; + + UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); + + log.info("[{}] Start async delete offloaded ledger, ledgerId {} uuid {} because of the reason {}.", name(), + ledgerId, uuid, cleanupReason); + + Map metadataMap = Maps.newHashMap(); + metadataMap.putAll(config.getLedgerOffloader().getOffloadDriverMetadata()); + metadataMap.put("ManagedLedgerName", name); + + try { + config.getLedgerOffloader() + .deleteOffloaded(ledgerId, uuid, metadataMap) + .whenComplete((ignored, exception) -> { + if (exception != null) { + log.warn("[{}] Failed delete offload for ledgerId {} uuid {}, (cleanup reason: {})", + name(), ledgerId, uuid, cleanupReason, exception); + future.completeExceptionally( + new ManagedLedgerException("Failed to delete offloaded ledger")); + return; + } + future.complete(null); + }); + } catch (Exception e) { + log.warn("[{}] Failed to delete offloaded ledgers.", name(), e); + } + return future; + } + + private static ManagedLedgerException.MetaStoreException getMetaStoreException(Throwable t) { + if (t.getCause() instanceof MetadataStoreException.BadVersionException) { + return new ManagedLedgerException.BadVersionException(t.getMessage()); + } else { + return new ManagedLedgerException.MetaStoreException(t); + } + } + + protected static class DelHelper { + private final TrashKey key; + private final LedgerInfo context; + + public DelHelper(TrashKey key, LedgerInfo context) { + this.key = key; + this.context = context; + } + + public TrashKey getKey() { + return key; + } + + public LedgerInfo getContext() { + return context; + } + + public static DelHelper buildHelper(TrashKey key, LedgerInfo context) { + return new DelHelper(key, context); + } + } + + public static class TrashKey implements Comparable { + + private final int retryCount; + + private final long ledgerId; + + //the same ledgerId maybe correspond two offload storage. + private final long msb; + + private final LedgerType type; + + private long lastDeleteTs; + + public TrashKey(int retryCount, long ledgerId, long msb, LedgerType type) { + this.retryCount = retryCount; + this.ledgerId = ledgerId; + this.msb = msb; + this.type = type; + } + + private void markLastDeleteTs() { + this.lastDeleteTs = System.currentTimeMillis(); + } + + private String toStringKey() { + return retryCount + TRASH_KEY_SEPARATOR + ledgerId + TRASH_KEY_SEPARATOR + msb + TRASH_KEY_SEPARATOR + + type; + } + + public static TrashKey buildKey(int retryCount, long ledgerId, long msb, LedgerType type) { + return new TrashKey(retryCount, ledgerId, msb, type); + } + + public static TrashKey buildKey(String strKey) { + String[] split = strKey.split(TRASH_KEY_SEPARATOR); + int retryCount = Integer.parseInt(split[0]); + long ledgerId = Long.parseLong(split[1]); + long msb = Long.parseLong(split[2]); + LedgerType type = LedgerType.valueOf(split[3]); + return new TrashKey(retryCount, ledgerId, msb, type); + } + + public int getRetryCount() { + return retryCount; + } + + public long getLedgerId() { + return ledgerId; + } + + public long getMsb() { + return msb; + } + + public LedgerType getType() { + return type; + } + + private boolean isLedger() { + return LedgerType.LEDGER.equals(type); + } + + private boolean isOffloadLedger() { + return LedgerType.OFFLOAD_LEDGER.equals(type); + } + + @Override + public int compareTo(TrashKey other) { + if (other == this) { + return 0; + } + int c1 = this.retryCount - other.retryCount; + if (c1 != 0) { + return c1; + } + long c2 = this.ledgerId - other.ledgerId; + if (c2 != 0) { + return c2 > 0 ? 1 : -1; + } + long c3 = this.msb - other.msb; + if (c3 != 0) { + return c3 > 0 ? 1 : -1; + } + return this.type.compareTo(other.type); + } + + @Override + public String toString() { + return "TrashKey{" + + "retryCount=" + + retryCount + + ", ledgerId=" + + ledgerId + + ", msb=" + + msb + + ", type=" + + type + + ", lastDeleteTs=" + + lastDeleteTs + + '}'; + } + } + + public enum State { + None, + Initialized, + FENCED, + Closed, + } + + +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedTrashMXBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedTrashMXBeanImpl.java new file mode 100644 index 0000000000000..0afb33112f10a --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedTrashMXBeanImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import java.util.concurrent.atomic.AtomicLong; +import org.apache.bookkeeper.mledger.ManagedTrash; +import org.apache.bookkeeper.mledger.ManagedTrashMXBean; + +public class ManagedTrashMXBeanImpl implements ManagedTrashMXBean { + + private final ManagedTrash managedTrash; + + private final AtomicLong totalNumberOfDeletedCount = new AtomicLong(); + + private final AtomicLong totalNumberOfArchiveCount = new AtomicLong(); + + public ManagedTrashMXBeanImpl(ManagedTrash managedTrash) { + this.managedTrash = managedTrash; + } + + @Override + public String getName() { + return managedTrash.name(); + } + + @Override + public long getCurrentNumberOfLedgersWaitingToDelete() { + if (managedTrash instanceof ManagedTrashImpl) { + return ((ManagedTrashImpl) managedTrash).getTrashDataSize(); + } + return 0; + } + + @Override + public void increaseTotalNumberOfDeleteLedgers() { + totalNumberOfDeletedCount.incrementAndGet(); + } + + @Override + public long getTotalNumberOfDeleteLedgers() { + return totalNumberOfDeletedCount.get(); + } + + @Override + public long getCurrentNumberOfLedgersWaitingToArchive() { + if (managedTrash instanceof ManagedTrashImpl) { + return ((ManagedTrashImpl) managedTrash).getToArchiveDataSize(); + } + return 0; + } + + @Override + public void increaseTotalNumberOfArchiveLedgers() { + totalNumberOfArchiveCount.incrementAndGet(); + } + + @Override + public long getTotalNumberOfArchiveLedgers() { + return totalNumberOfArchiveCount.get(); + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java index 214c3afe1bcdb..d3c4d6baa615d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java @@ -34,15 +34,16 @@ import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; +import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.Stat; @Slf4j public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl { public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, - ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, - String name) { - super(factory, bookKeeper, store, config, scheduledExecutor, name); + MetadataStore metadataStore, ManagedLedgerConfig config, + OrderedScheduler scheduledExecutor, String name) { + super(factory, bookKeeper, store, metadataStore, config, scheduledExecutor, name); } CompletableFuture initializeAndCreateCursor(PositionImpl startPosition) { diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index c4e502819fa9e..0bc1100f544fd 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -151,3 +151,13 @@ message ManagedCursorInfoMetadata { required CompressionType compressionType = 1; required int32 uncompressedSize = 2; } + +message TrashDataComponent { + repeated TrashData component = 1; +} + +message TrashData { + required string key = 1; + optional ManagedLedgerInfo.LedgerInfo value = 2; +} + diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedTrashTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedTrashTest.java new file mode 100644 index 0000000000000..2318595a008ea --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedTrashTest.java @@ -0,0 +1,750 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import com.google.common.base.Charsets; +import java.lang.reflect.Field; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.PulsarMockBookKeeper; +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedTrash; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.Stat; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ManagedTrashTest extends MockedBookKeeperTestCase { + + private static final Charset Encoding = Charsets.UTF_8; + + private List deletedLedgers = new ArrayList<>(); + + PulsarMockBookKeeper bkc; + + FaultInjectionMetadataStore metadataStore; + + private Map persistedData = new HashMap<>(); + + + @Override + protected void setUpTestCase() throws Exception { + metadataStore = new FaultInjectionMetadataStore( + MetadataStoreExtended.create("memory:local", MetadataStoreConfig.builder().build())) { + @Override + public CompletableFuture put(String path, byte[] value, Optional expectedVersion) { + CompletableFuture future = super.put(path, value, expectedVersion); + future.whenComplete((res, e) -> { + if (e != null) { + return; + } + persistedData.put(path, value); + }); + return future; + } + + }; + bkc = new PulsarMockBookKeeper(executor) { + @Override + public void asyncDeleteLedger(long lId, AsyncCallback.DeleteCallback cb, Object ctx) { + getProgrammedFailure().thenComposeAsync((res) -> { + if (lId >= 10000 && lId < 20000) { + throw new IllegalArgumentException("LedgerId is invalid"); + } + if (lId >= 20000) { + //if ledgerId >= 20000, whatever ledgers hold it, delete succeed. + deletedLedgers.add(lId); + return FutureUtils.value(null); + } + if (ledgers.containsKey(lId)) { + ledgers.remove(lId); + deletedLedgers.add(lId); + return FutureUtils.value(null); + } else { + return FutureUtils.exception(new BKException.BKNoSuchLedgerExistsException()); + } + }, executor).whenCompleteAsync((res, exception) -> { + if (exception != null) { + cb.deleteComplete(getExceptionCode(exception), ctx); + } else { + cb.deleteComplete(BKException.Code.OK, ctx); + } + }, executor); + } + }; + factory = new ManagedLedgerFactoryImpl(metadataStore, bkc); + } + + @Override + protected void cleanUpTestCase() throws Exception { + super.cleanUpTestCase(); + deletedLedgers.clear(); + persistedData.clear(); + } + + private LedgerInfo buildOffloadLedgerInfo(long ledgerId, UUID uuid) { + LedgerInfo.Builder builder = LedgerInfo.newBuilder().setLedgerId(ledgerId); + builder.getOffloadContextBuilder() + .setUidMsb(uuid.getMostSignificantBits()) + .setUidLsb(uuid.getLeastSignificantBits()); + return builder.build(); + } + + @Test + public void testTrashKeyOrder() throws Exception { + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setSupportTwoPhaseDeletion(true); + managedLedgerConfig.setRetryDeleteIntervalSeconds(1); + managedLedgerConfig.setArchiveDataLimitSize(10); + ManagedLedger ledger = factory.open("my_test_ledger", managedLedgerConfig); + + Field field = ManagedLedgerImpl.class.getDeclaredField("managedTrash"); + field.setAccessible(true); + ManagedTrashImpl managedTrash = (ManagedTrashImpl) field.get(ledger); + + managedTrash.appendLedgerTrashData(10, null, ManagedTrash.LedgerType.LEDGER); + managedTrash.appendLedgerTrashData(6, null, ManagedTrash.LedgerType.LEDGER); + managedTrash.appendLedgerTrashData(100, null, ManagedTrash.LedgerType.LEDGER); + managedTrash.appendLedgerTrashData(3, null, ManagedTrash.LedgerType.LEDGER); + managedTrash.appendLedgerTrashData(7, null, ManagedTrash.LedgerType.LEDGER); + + + UUID uuid = UUID.randomUUID(); + LedgerInfo ledgerInfo = buildOffloadLedgerInfo(7, uuid); + managedTrash.appendLedgerTrashData(7, ledgerInfo, ManagedTrash.LedgerType.OFFLOAD_LEDGER); + + Field field1 = ManagedTrashImpl.class.getDeclaredField("trashData"); + field1.setAccessible(true); + ConcurrentSkipListMap map = (ConcurrentSkipListMap) field1.get(managedTrash); + System.out.println(map); + + Map.Entry entry = map.pollFirstEntry(); + Assert.assertEquals(entry.getKey().getLedgerId(), 3); + Assert.assertEquals(entry.getKey().getMsb(), 0); + Assert.assertEquals(entry.getKey().getType(), ManagedTrash.LedgerType.LEDGER); + + entry = map.pollFirstEntry(); + Assert.assertEquals(entry.getKey().getLedgerId(), 6); + Assert.assertEquals(entry.getKey().getMsb(), 0); + Assert.assertEquals(entry.getKey().getType(), ManagedTrash.LedgerType.LEDGER); + + if (uuid.getMostSignificantBits() > 0) { + entry = map.pollFirstEntry(); + Assert.assertEquals(entry.getKey().getLedgerId(), 7); + Assert.assertEquals(entry.getKey().getMsb(), 0); + Assert.assertEquals(entry.getKey().getType(), ManagedTrash.LedgerType.LEDGER); + + entry = map.pollFirstEntry(); + Assert.assertEquals(entry.getKey().getLedgerId(), 7); + Assert.assertEquals(entry.getKey().getMsb(), uuid.getMostSignificantBits()); + Assert.assertEquals(entry.getKey().getType(), ManagedTrash.LedgerType.OFFLOAD_LEDGER); + } else { + entry = map.pollFirstEntry(); + Assert.assertEquals(entry.getKey().getLedgerId(), 7); + Assert.assertEquals(entry.getKey().getMsb(), uuid.getMostSignificantBits()); + Assert.assertEquals(entry.getKey().getType(), ManagedTrash.LedgerType.OFFLOAD_LEDGER); + + entry = map.pollFirstEntry(); + Assert.assertEquals(entry.getKey().getLedgerId(), 7); + Assert.assertEquals(entry.getKey().getMsb(), 0); + Assert.assertEquals(entry.getKey().getType(), ManagedTrash.LedgerType.LEDGER); + } + + entry = map.pollFirstEntry(); + Assert.assertEquals(entry.getKey().getLedgerId(), 10); + Assert.assertEquals(entry.getKey().getMsb(), 0); + Assert.assertEquals(entry.getKey().getType(), ManagedTrash.LedgerType.LEDGER); + + entry = map.pollFirstEntry(); + Assert.assertEquals(entry.getKey().getLedgerId(), 100); + Assert.assertEquals(entry.getKey().getMsb(), 0); + Assert.assertEquals(entry.getKey().getType(), ManagedTrash.LedgerType.LEDGER); + } + + + @Test + public void testGetToDeleteDataFilterByManagedLedgers() throws Exception { + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setSupportTwoPhaseDeletion(true); + managedLedgerConfig.setMaxEntriesPerLedger(10); + managedLedgerConfig.setRetentionTime(3, TimeUnit.SECONDS); + ManagedLedger ledger = factory.open("my_test_ledger", managedLedgerConfig); + + Field field = ManagedLedgerImpl.class.getDeclaredField("managedTrash"); + field.setAccessible(true); + ManagedTrashImpl managedTrash = (ManagedTrashImpl) field.get(ledger); + + Field field1 = ManagedTrashImpl.class.getDeclaredField("managedLedgers"); + field1.setAccessible(true); + + NavigableMap managedLedgers = (NavigableMap) field1.get(managedTrash); + + LedgerInfo emptyLedgerInfo = LedgerInfo.newBuilder().setLedgerId(-1).build(); + for (int i = 0; i < 30; i++) { + long ledgerId = 10000 + i; + managedLedgers.put(ledgerId, emptyLedgerInfo); + if (i % 2 == 0) { + managedTrash.appendLedgerTrashData(ledgerId, null, ManagedTrash.LedgerType.LEDGER); + } else { + //build offload ledger + UUID uuid = UUID.randomUUID(); + LedgerInfo ledgerInfo = buildOffloadLedgerInfo(ledgerId, uuid); + managedTrash.appendLedgerTrashData(ledgerId, ledgerInfo, ManagedTrash.LedgerType.OFFLOAD_LEDGER); + } + } + ManagedTrashImpl.Tuple tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.BOTH); + assertEquals(tuple.getToDelete().size(), 0); + assertEquals(tuple.isFiltered(), true); + + //when managedLedgers remove 29, it can be to delete. + managedLedgers.remove(10029L); + + tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.BOTH); + assertEquals(tuple.getToDelete().size(), 1); + assertEquals(tuple.isFiltered(), true); + assertEquals(tuple.getToDelete().get(0).getKey().getLedgerId(), 10029L); + assertEquals(tuple.getToDelete().get(0).getKey().getType(), ManagedTrash.LedgerType.OFFLOAD_LEDGER); + + + tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.LEDGER); + assertEquals(tuple.getToDelete().size(), 0); + assertEquals(tuple.isFiltered(), true); + + + tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.OFFLOAD_LEDGER); + assertEquals(tuple.getToDelete().size(), 1); + assertEquals(tuple.isFiltered(), true); + assertEquals(tuple.getToDelete().get(0).getKey().getLedgerId(), 10029L); + assertEquals(tuple.getToDelete().get(0).getKey().getType(), ManagedTrash.LedgerType.OFFLOAD_LEDGER); + + managedLedgers.remove(10028L); + + tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.BOTH); + assertEquals(tuple.getToDelete().size(), 2); + assertEquals(tuple.isFiltered(), true); + assertEquals(tuple.getToDelete().get(0).getKey().getLedgerId(), 10029L); + assertEquals(tuple.getToDelete().get(0).getKey().getType(), ManagedTrash.LedgerType.OFFLOAD_LEDGER); + assertEquals(tuple.getToDelete().get(1).getKey().getLedgerId(), 10028L); + assertEquals(tuple.getToDelete().get(1).getKey().getType(), ManagedTrash.LedgerType.LEDGER); + + tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.LEDGER); + assertEquals(tuple.getToDelete().size(), 1); + assertEquals(tuple.isFiltered(), true); + assertEquals(tuple.getToDelete().get(0).getKey().getLedgerId(), 10028L); + assertEquals(tuple.getToDelete().get(0).getKey().getType(), ManagedTrash.LedgerType.LEDGER); + + tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.OFFLOAD_LEDGER); + assertEquals(tuple.getToDelete().size(), 1); + assertEquals(tuple.isFiltered(), true); + assertEquals(tuple.getToDelete().get(0).getKey().getLedgerId(), 10029L); + assertEquals(tuple.getToDelete().get(0).getKey().getType(), ManagedTrash.LedgerType.OFFLOAD_LEDGER); + } + + @Test + public void testGetToDeleteDataFilterByLastTimeStamp() throws Exception { + int maxDeleteCount = 3; + int deleteIntervalSeconds = 5; + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setSupportTwoPhaseDeletion(true); + managedLedgerConfig.setMaxEntriesPerLedger(10); + managedLedgerConfig.setRetentionTime(3, TimeUnit.SECONDS); + managedLedgerConfig.setMaxDeleteCount(maxDeleteCount); + managedLedgerConfig.setRetryDeleteIntervalSeconds(deleteIntervalSeconds); + ManagedLedger ledger = factory.open("my_test_ledger", managedLedgerConfig); + + Field field = ManagedLedgerImpl.class.getDeclaredField("managedTrash"); + field.setAccessible(true); + ManagedTrashImpl managedTrash = (ManagedTrashImpl) field.get(ledger); + + for (int i = 0; i < 30; i++) { + long ledgerId = 10000 + i; + if (i % 2 == 0) { + managedTrash.appendLedgerTrashData(ledgerId, null, ManagedTrash.LedgerType.LEDGER); + } else { + //build offload ledger + UUID uuid = UUID.randomUUID(); + LedgerInfo ledgerInfo = buildOffloadLedgerInfo(ledgerId, uuid); + managedTrash.appendLedgerTrashData(ledgerId, ledgerInfo, ManagedTrash.LedgerType.OFFLOAD_LEDGER); + } + } + ManagedTrashImpl.Tuple tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.BOTH); + assertEquals(tuple.getToDelete().size(), 3); + assertEquals(tuple.isFiltered(), false); + + tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.LEDGER); + assertEquals(tuple.getToDelete().size(), 3); + assertEquals(tuple.isFiltered(), false); + + tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.OFFLOAD_LEDGER); + assertEquals(tuple.getToDelete().size(), 3); + assertEquals(tuple.isFiltered(), false); + + for (int i = 0; i < 10; i++) { + managedTrash.triggerDeleteInBackground(); + } + + + Field field2 = ManagedTrashImpl.class.getDeclaredField("trashData"); + field2.setAccessible(true); + ConcurrentSkipListMap trashData = + (ConcurrentSkipListMap) field2.get(managedTrash); + + Awaitility.await().untilAsserted(() -> { + assertEquals(trashData.lastEntry().getKey().getRetryCount(), maxDeleteCount - 1); + }); + + + tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.BOTH); + assertEquals(tuple.getToDelete().size(), 0); + assertEquals(tuple.isFiltered(), true); + + tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.LEDGER); + assertEquals(tuple.getToDelete().size(), 0); + assertEquals(tuple.isFiltered(), true); + + tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.OFFLOAD_LEDGER); + assertEquals(tuple.getToDelete().size(), 0); + assertEquals(tuple.isFiltered(), true); + + Thread.sleep(deleteIntervalSeconds * 1000); + + tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.BOTH); + assertEquals(tuple.getToDelete().size(), 3); + assertEquals(tuple.isFiltered(), false); + + tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.LEDGER); + assertEquals(tuple.getToDelete().size(), 3); + assertEquals(tuple.isFiltered(), false); + + tuple = managedTrash.getToDeleteData(ManagedTrash.LedgerType.OFFLOAD_LEDGER); + assertEquals(tuple.getToDelete().size(), 3); + assertEquals(tuple.isFiltered(), false); + } + + @Test + public void testTriggerDeleteOnTriggerSelf() throws Exception { + int deleteIntervalSeconds = 10; + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setSupportTwoPhaseDeletion(true); + managedLedgerConfig.setMaxEntriesPerLedger(10); + managedLedgerConfig.setRetentionTime(3, TimeUnit.SECONDS); + managedLedgerConfig.setRetryDeleteIntervalSeconds(deleteIntervalSeconds); + ManagedLedger ledger = factory.open("my_test_ledger", managedLedgerConfig); + + Field field = ManagedLedgerImpl.class.getDeclaredField("managedTrash"); + field.setAccessible(true); + ManagedTrashImpl managedTrash = (ManagedTrashImpl) field.get(ledger); + + long nextDeleteDelayMillis = managedTrash.calculateNextDeleteDelayMillis(deleteIntervalSeconds * 1000); + + for (int i = 0; i < 30; i++) { + managedTrash.appendLedgerTrashData(20000 + i, null, ManagedTrash.LedgerType.LEDGER); + } + //trigger delete, the second delete will execute delay `nextDeleteDelayMillis` after first delete finish. + managedTrash.triggerDeleteInBackground(); + + AtomicLong firstTimeDelete = new AtomicLong(); + Awaitility.await().untilAsserted(() -> { + assertTrue(deletedLedgers.size() >= 3); + firstTimeDelete.set(System.currentTimeMillis()); + }); + + Awaitility.await().untilAsserted(() -> { + assertTrue(deletedLedgers.size() >= 6); + assertTrue(System.currentTimeMillis() - firstTimeDelete.get() > nextDeleteDelayMillis); + }); + } + + @Test + public void testTriggerDeleteOnImmediately() throws Exception { + int deleteIntervalSeconds = 10; + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setSupportTwoPhaseDeletion(true); + managedLedgerConfig.setMaxEntriesPerLedger(10); + managedLedgerConfig.setRetentionTime(3, TimeUnit.SECONDS); + managedLedgerConfig.setRetryDeleteIntervalSeconds(deleteIntervalSeconds); + ManagedLedger ledger = factory.open("my_test_ledger", managedLedgerConfig); + + Field field = ManagedLedgerImpl.class.getDeclaredField("managedTrash"); + field.setAccessible(true); + ManagedTrashImpl managedTrash = (ManagedTrashImpl) field.get(ledger); + + long nextDeleteDelayMillis = managedTrash.calculateNextDeleteDelayMillis(deleteIntervalSeconds * 1000); + + for (int i = 0; i < 30; i++) { + managedTrash.appendLedgerTrashData(20000 + i, null, ManagedTrash.LedgerType.LEDGER); + } + //trigger delete twice, the second delete will continue immediately after first delete finish. + managedTrash.triggerDeleteInBackground(); + managedTrash.triggerDeleteInBackground(); + + AtomicLong firstTimeDelete = new AtomicLong(); + Awaitility.await().untilAsserted(() -> { + assertTrue(deletedLedgers.size() >= 3); + firstTimeDelete.set(System.currentTimeMillis()); + }); + + Awaitility.await().untilAsserted(() -> { + assertTrue(deletedLedgers.size() >= 6); + assertTrue(System.currentTimeMillis() - firstTimeDelete.get() < nextDeleteDelayMillis); + }); + } + + @Test + public void testManagedTrashDelete() throws Exception { + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setSupportTwoPhaseDeletion(true); + managedLedgerConfig.setMaxEntriesPerLedger(10); + managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS); + ManagedLedger ledger = factory.open("my_test_ledger", managedLedgerConfig); + for (int i = 0; i < 100; i++) { + ledger.addEntry("test".getBytes(Encoding)); + } + Awaitility.await().untilAsserted(() -> { + assertEquals(deletedLedgers.size(), 9); + }); + } + + @Test + public void testManagedTrashArchive() throws Exception { + int entryCount = 30; + int archiveLimit = 10; + + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setSupportTwoPhaseDeletion(true); + managedLedgerConfig.setRetryDeleteIntervalSeconds(1); + managedLedgerConfig.setArchiveDataLimitSize(archiveLimit); + managedLedgerConfig.setMaxDeleteCount(3); + ManagedLedger ledger = factory.open("my_test_ledger", managedLedgerConfig); + + Field field = ManagedLedgerImpl.class.getDeclaredField("managedTrash"); + field.setAccessible(true); + ManagedTrashImpl managedTrash = (ManagedTrashImpl) field.get(ledger); + + + //the ledgerId >= 10000, it will delete failed. see line_89. + for (int i = 0; i < entryCount; i++) { + managedTrash.appendLedgerTrashData(10000 + i, null, ManagedTrash.LedgerType.LEDGER); + } + managedTrash.triggerDeleteInBackground(); + + Awaitility.await().untilAsserted(() -> { + assertEquals(5, persistedData.size()); + }); + + assertEquals(managedTrash.getTrashDataSize(), 0); + + Field field2 = ManagedTrashImpl.class.getDeclaredField("toArchiveCount"); + field2.setAccessible(true); + AtomicInteger toArchiveCount = (AtomicInteger) field2.get(managedTrash); + + assertEquals(toArchiveCount.get(), 0); + + + ConcurrentSkipListMap totalArchive = new ConcurrentSkipListMap<>(); + for (Map.Entry entry : persistedData.entrySet()) { + if (entry.getKey().startsWith("/managed-trash")) { + if (entry.getKey().endsWith("/delete")) { + byte[] value = entry.getValue(); + assertEquals(value.length, 0); + } else { + Map archiveData = + managedTrash.deSerialize(entry.getValue()); + assertEquals(archiveData.size(), archiveLimit); + totalArchive.putAll(archiveData); + } + } + } + + assertEquals(totalArchive.size(), entryCount); + int index = 0; + for (Map.Entry entry : totalArchive.entrySet()) { + assertEquals(entry.getKey().getLedgerId(), 10000 + index); + index++; + } + + } + + @Test + public void testManagedTrashClose() throws Exception { + //when managedTrash close, it will persist trashData. + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setSupportTwoPhaseDeletion(true); + managedLedgerConfig.setRetryDeleteIntervalSeconds(1); + managedLedgerConfig.setArchiveDataLimitSize(10); + ManagedLedger ledger = factory.open("my_test_ledger", managedLedgerConfig); + + Field field = ManagedLedgerImpl.class.getDeclaredField("managedTrash"); + field.setAccessible(true); + ManagedTrashImpl managedTrash = (ManagedTrashImpl) field.get(ledger); + + managedTrash.appendLedgerTrashData(10, null, ManagedTrash.LedgerType.LEDGER); + managedTrash.appendLedgerTrashData(6, null, ManagedTrash.LedgerType.LEDGER); + managedTrash.appendLedgerTrashData(100, null, ManagedTrash.LedgerType.LEDGER); + managedTrash.appendLedgerTrashData(3, null, ManagedTrash.LedgerType.LEDGER); + managedTrash.appendLedgerTrashData(7, null, ManagedTrash.LedgerType.LEDGER); + ledger.close(); + assertEquals(persistedData.size(), 2); + + byte[] content = persistedData.get("/managed-trash/managed-ledger/my_test_ledger/delete"); + NavigableMap persistedTrashData = + managedTrash.deSerialize(content); + + assertEquals(persistedTrashData.size(), 5); + + Map.Entry entry = + persistedTrashData.pollFirstEntry(); + + Assert.assertEquals(entry.getKey().getLedgerId(), 3); + Assert.assertEquals(entry.getKey().getMsb(), 0); + Assert.assertEquals(entry.getKey().getType(), ManagedTrash.LedgerType.LEDGER); + + entry = persistedTrashData.pollFirstEntry(); + + Assert.assertEquals(entry.getKey().getLedgerId(), 6); + Assert.assertEquals(entry.getKey().getMsb(), 0); + Assert.assertEquals(entry.getKey().getType(), ManagedTrash.LedgerType.LEDGER); + + entry = persistedTrashData.pollFirstEntry(); + + Assert.assertEquals(entry.getKey().getLedgerId(), 7); + Assert.assertEquals(entry.getKey().getMsb(), 0); + Assert.assertEquals(entry.getKey().getType(), ManagedTrash.LedgerType.LEDGER); + + entry = persistedTrashData.pollFirstEntry(); + + Assert.assertEquals(entry.getKey().getLedgerId(), 10); + Assert.assertEquals(entry.getKey().getMsb(), 0); + Assert.assertEquals(entry.getKey().getType(), ManagedTrash.LedgerType.LEDGER); + + entry = persistedTrashData.pollFirstEntry(); + + Assert.assertEquals(entry.getKey().getLedgerId(), 100); + Assert.assertEquals(entry.getKey().getMsb(), 0); + Assert.assertEquals(entry.getKey().getType(), ManagedTrash.LedgerType.LEDGER); + } + + @Test + public void testGetAllArchiveIndex() throws Exception { + int entryCount = 30; + int archiveLimit = 10; + + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setSupportTwoPhaseDeletion(true); + managedLedgerConfig.setRetryDeleteIntervalSeconds(1); + managedLedgerConfig.setArchiveDataLimitSize(archiveLimit); + managedLedgerConfig.setMaxDeleteCount(3); + ManagedLedger ledger = factory.open("my_test_ledger", managedLedgerConfig); + + Field field = ManagedLedgerImpl.class.getDeclaredField("managedTrash"); + field.setAccessible(true); + ManagedTrashImpl managedTrash = (ManagedTrashImpl) field.get(ledger); + + + //the ledgerId >= 10000, it will delete failed. see line_89. + for (int i = 0; i < entryCount; i++) { + managedTrash.appendLedgerTrashData(10000 + i, null, ManagedTrash.LedgerType.LEDGER); + } + managedTrash.triggerDeleteInBackground(); + + Awaitility.await().untilAsserted(() -> { + assertEquals(5, persistedData.size()); + }); + + CompletableFuture> index = managedTrash.getAllArchiveIndex(); + + assertEquals(index.get().size(), 3); + } + + @Test + public void testGetArchiveData() throws Exception { + int entryCount = 30; + int archiveLimit = 10; + + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setSupportTwoPhaseDeletion(true); + managedLedgerConfig.setRetryDeleteIntervalSeconds(1); + managedLedgerConfig.setArchiveDataLimitSize(archiveLimit); + managedLedgerConfig.setMaxDeleteCount(3); + ManagedLedger ledger = factory.open("my_test_ledger", managedLedgerConfig); + + Field field = ManagedLedgerImpl.class.getDeclaredField("managedTrash"); + field.setAccessible(true); + ManagedTrashImpl managedTrash = (ManagedTrashImpl) field.get(ledger); + + + //the ledgerId >= 10000, it will delete failed. see line_89. + for (int i = 0; i < entryCount; i++) { + managedTrash.appendLedgerTrashData(10000 + i, null, ManagedTrash.LedgerType.LEDGER); + } + managedTrash.triggerDeleteInBackground(); + + Awaitility.await().untilAsserted(() -> { + assertEquals(5, persistedData.size()); + }); + + CompletableFuture> indexFuture = managedTrash.getAllArchiveIndex(); + + List indexes = indexFuture.get(); + + Map trashedData = new ConcurrentSkipListMap<>(); + for (Long index : indexes) { + trashedData.putAll(managedTrash.getArchiveData(index).get()); + } + assertEquals(trashedData.size(), 30); + + int i = 0; + for (Map.Entry entry : trashedData.entrySet()) { + Assert.assertEquals(entry.getKey().getRetryCount(), 0); + Assert.assertEquals(entry.getKey().getLedgerId(), 10000 + i); + Assert.assertEquals(entry.getKey().getMsb(), 0); + Assert.assertEquals(entry.getKey().getType(), ManagedTrash.LedgerType.LEDGER); + i++; + } + } + + @Test + public void testAsyncCloseAfterAllTrashDataDeleteOnce() throws Exception { + int entryCount = 30; + int archiveLimit = 10; + int maxDeleteCount = 3; + + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setSupportTwoPhaseDeletion(true); + managedLedgerConfig.setRetryDeleteIntervalSeconds(1); + managedLedgerConfig.setArchiveDataLimitSize(archiveLimit); + managedLedgerConfig.setMaxDeleteCount(maxDeleteCount); + ManagedLedger ledger = factory.open("my_test_ledger", managedLedgerConfig); + + Field field = ManagedLedgerImpl.class.getDeclaredField("managedTrash"); + field.setAccessible(true); + ManagedTrashImpl managedTrash = (ManagedTrashImpl) field.get(ledger); + + + //the ledgerId >= 10000, it will delete failed. see line_89. + for (int i = 0; i < entryCount; i++) { + long ledgerId = 10000 + i; + if (i % 2 == 0) { + managedTrash.appendLedgerTrashData(ledgerId, null, ManagedTrash.LedgerType.LEDGER); + } else { + //build offload ledger + UUID uuid = UUID.randomUUID(); + LedgerInfo ledgerInfo = buildOffloadLedgerInfo(ledgerId, uuid); + managedTrash.appendLedgerTrashData(ledgerId, ledgerInfo, ManagedTrash.LedgerType.OFFLOAD_LEDGER); + } + } + + try { + //all ledger will least delete once. + managedTrash.asyncCloseAfterAllLedgerDeleteOnce().get(); + } catch (Exception e) { + Assert.fail(); + } + assertEquals(managedTrash.state, ManagedTrashImpl.State.Closed); + + Field field1 = ManagedTrashImpl.class.getDeclaredField("trashData"); + field1.setAccessible(true); + ConcurrentSkipListMap trashData = + (ConcurrentSkipListMap) field1.get(managedTrash); + + assertEquals(trashData.size(), 30); + + int offloadNotDeleteCount = 0; + for (Map.Entry entry : trashData.entrySet()) { + if (ManagedTrash.LedgerType.LEDGER == entry.getKey().getType()) { + assertTrue(entry.getKey().getRetryCount() < maxDeleteCount); + } + if (ManagedTrash.LedgerType.OFFLOAD_LEDGER == entry.getKey().getType()) { + assertTrue(entry.getKey().getRetryCount() == maxDeleteCount); + offloadNotDeleteCount++; + } + } + + assertEquals(offloadNotDeleteCount, 15); + + byte[] content = persistedData.get("/managed-trash/managed-ledger/my_test_ledger/delete"); + + NavigableMap persistedTrashData = managedTrash.deSerialize(content); + + assertEquals(persistedTrashData.size(), 30); + + for (int i = 0; i < persistedTrashData.size(); i++) { + Map.Entry entry1 = persistedTrashData.pollFirstEntry(); + Map.Entry entry2 = trashData.pollFirstEntry(); + assertEquals(entry1.getKey().compareTo(entry2.getKey()), 0); + } + } + + @Test + public void testRecover() throws Exception { + int entryCount = 30; + ConcurrentSkipListMap map = new ConcurrentSkipListMap<>(); + LedgerInfo emptyLedgerInfo = LedgerInfo.newBuilder().setLedgerId(-1).build(); + for (int i = 0; i < entryCount; i++) { + ManagedTrashImpl.TrashKey trashKey = new ManagedTrashImpl.TrashKey(3, i, 0, ManagedTrash.LedgerType.LEDGER); + map.put(trashKey, emptyLedgerInfo); + } + metadataStore.put("/managed-trash/managed-ledger/my_test_ledger/delete", ManagedTrashImpl.serialize(map), + Optional.of(-1L)); + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setSupportTwoPhaseDeletion(true); + managedLedgerConfig.setMaxEntriesPerLedger(10); + managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS); + ManagedLedger ledger = factory.open("my_test_ledger", managedLedgerConfig); + + Field field = ManagedLedgerImpl.class.getDeclaredField("managedTrash"); + field.setAccessible(true); + ManagedTrashImpl managedTrash = (ManagedTrashImpl) field.get(ledger); + + Field field1 = ManagedTrashImpl.class.getDeclaredField("trashData"); + field1.setAccessible(true); + ConcurrentSkipListMap trashData = + (ConcurrentSkipListMap) field1.get(managedTrash); + + assertEquals(trashData.size(), entryCount); + + for (int i = 0; i < entryCount; i++) { + Map.Entry entry1 = map.pollFirstEntry(); + Map.Entry entry2 = trashData.pollFirstEntry(); + assertEquals(entry1.getKey().getLedgerId(), entry2.getKey().getLedgerId()); + } + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 6639fe0cb3513..f1de1197072e7 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2514,6 +2514,36 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int managedLedgerInactiveLedgerRolloverTimeSeconds = 0; + @FieldContext( + dynamic = true, + category = CATEGORY_STORAGE_ML, + doc = "Using two phase deletion when delete ledger. if true, " + + "managedTrash will take over ledger deletion. (Default false)" + ) + private boolean managedLedgerSupportTwoPhaseDeletion; + + @FieldContext( + dynamic = true, + category = CATEGORY_STORAGE_ML, + doc = "To control managedTrash archive data size. If reach, persist archive data. (Default 500)" + ) + private int managedTrashArchiveDataLimitSize = 500; + + @FieldContext( + dynamic = true, + category = CATEGORY_STORAGE_ML, + doc = "The interval of retry delete same ledger again. (Default 60 seconds)" + ) + private int managedTrashRetryDeleteInternalSeconds = 60; + + @FieldContext( + dynamic = true, + category = CATEGORY_STORAGE_ML, + doc = "The max delete count of delete same ledger or offload ledger in managedTrash. " + + "If a ledger delete count reach it, the ledger will be archived. (Default is 5)" + ) + private int managedLedgerMaxDeleteCount = 5; + @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Evicting cache data by the slowest markDeletedPosition or readPosition. " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index bf21b0a6fdc3d..954ec07a86e39 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1590,6 +1590,12 @@ public CompletableFuture getManagedLedgerConfig(TopicName t serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS); managedLedgerConfig.setCacheEvictionByMarkDeletedPosition( serviceConfig.isCacheEvictionByMarkDeletedPosition()); + managedLedgerConfig.setSupportTwoPhaseDeletion( + serviceConfig.isManagedLedgerSupportTwoPhaseDeletion()); + managedLedgerConfig.setArchiveDataLimitSize(serviceConfig.getManagedTrashArchiveDataLimitSize()); + managedLedgerConfig.setRetryDeleteIntervalSeconds( + serviceConfig.getManagedTrashRetryDeleteInternalSeconds()); + managedLedgerConfig.setMaxDeleteCount(serviceConfig.getManagedLedgerMaxDeleteCount()); OffloadPoliciesImpl nsLevelOffloadPolicies = (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index cac386405a64c..c173395e2b701 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -340,7 +340,7 @@ protected static String parent(String path) { * 2. starts with '/' * 3. not ends with '/', except root path "/" */ - static boolean isValidPath(String path) { + protected static boolean isValidPath(String path) { return StringUtils.equals(path, "/") || StringUtils.isNotBlank(path) && path.startsWith("/") diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index 50df4b6b0df0f..802e924326cd0 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -75,7 +75,7 @@ public ClientConfiguration getConf() { return super.getConf(); } - final Map ledgers = new ConcurrentHashMap<>(); + protected final Map ledgers = new ConcurrentHashMap<>(); final AtomicLong sequence = new AtomicLong(3); CompletableFuture defaultResponse = CompletableFuture.completedFuture(null); @@ -317,7 +317,7 @@ synchronized boolean checkReturnEmptyLedger() { return shouldFailNow; } - synchronized CompletableFuture getProgrammedFailure() { + protected synchronized CompletableFuture getProgrammedFailure() { return failures.isEmpty() ? defaultResponse : failures.remove(0); } @@ -351,7 +351,7 @@ public synchronized void addEntryDelay(long delay, TimeUnit unit) { addEntryDelaysMillis.add(unit.toMillis(delay)); } - static int getExceptionCode(Throwable t) { + protected static int getExceptionCode(Throwable t) { if (t instanceof BKException) { return ((BKException) t).getCode(); } else if (t.getCause() != null) {