From 55f47089ab1b13cae26635f01c470861e70a64a9 Mon Sep 17 00:00:00 2001 From: frankxieke Date: Sat, 9 Oct 2021 17:46:31 +0800 Subject: [PATCH] [offload] fix FileSystemManagedLedgerOffloader can not cleanup outdated ledger data --- .../mledger/impl/ManagedLedgerImpl.java | 6 +- .../mledger/impl/OffloadLedgerDeleteTest.java | 149 ++++++++++++++++++ 2 files changed, 154 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 47f38a952682e..8f542eaba7c7f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3083,9 +3083,13 @@ private void cleanupOffloaded(long ledgerId, UUID uuid, String offloadDriverName Map offloadDriverMetadata, String cleanupReason) { log.info("[{}] Cleanup offload for ledgerId {} uuid {} because of the reason {}.", name, ledgerId, uuid.toString(), cleanupReason); + Map metadataMap = Maps.newHashMap(); + metadataMap.putAll(offloadDriverMetadata); + metadataMap.put("ManagedLedgerName", name); + Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toHours(1)).limit(10), Retries.NonFatalPredicate, - () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, offloadDriverMetadata), + () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, metadataMap), scheduledExecutor, name).whenComplete((ignored, exception) -> { if (exception != null) { log.warn("[{}] Error cleaning up offload for {}, (cleanup reason: {})", diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java index f25332e91703a..e36835f2efbfc 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java @@ -20,11 +20,16 @@ import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -43,6 +48,102 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase { private static final Logger log = LoggerFactory.getLogger(OffloadLedgerDeleteTest.class); + + static class MockFileSystemLedgerOffloader implements LedgerOffloader { + interface InjectAfterOffload { + void call(); + } + + private String storageBasePath = "/Users/pulsar_filesystem_offloader"; + + private static String getStoragePath(String storageBasePath, String managedLedgerName) { + return storageBasePath == null ? managedLedgerName + "/" : storageBasePath + "/" + managedLedgerName + "/"; + } + + private static String getDataFilePath(String storagePath, long ledgerId, UUID uuid) { + return storagePath + ledgerId + "-" + uuid.toString(); + } + + ConcurrentHashMap offloads = new ConcurrentHashMap(); + ConcurrentHashMap deletes = new ConcurrentHashMap(); + OffloadPrefixTest.MockLedgerOffloader.InjectAfterOffload inject = null; + + Set offloadedLedgers() { + return offloads.keySet(); + } + + Set deletedOffloads() { + return deletes.keySet(); + } + + OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("filesystem", "", "", "", + null, null, + null, null, + OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, + OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, + OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES, + OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, + OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY); + + @Override + public String getOffloadDriverName() { + return "mockfilesystem"; + } + + @Override + public CompletableFuture offload(ReadHandle ledger, + UUID uuid, + Map extraMetadata) { + Assert.assertNotNull(extraMetadata.get("ManagedLedgerName")); + String storagePath = getStoragePath(storageBasePath, extraMetadata.get("ManagedLedgerName")); + String dataFilePath = getDataFilePath(storagePath, ledger.getId(), uuid); + CompletableFuture promise = new CompletableFuture<>(); + if (offloads.putIfAbsent(ledger.getId(), dataFilePath) == null) { + promise.complete(null); + } else { + promise.completeExceptionally(new Exception("Already exists exception")); + } + + if (inject != null) { + inject.call(); + } + return promise; + } + + @Override + public CompletableFuture readOffloaded(long ledgerId, UUID uuid, + Map offloadDriverMetadata) { + CompletableFuture promise = new CompletableFuture<>(); + promise.completeExceptionally(new UnsupportedOperationException()); + return promise; + } + + @Override + public CompletableFuture deleteOffloaded(long ledgerId, UUID uuid, + Map offloadDriverMetadata) { + Assert.assertNotNull(offloadDriverMetadata.get("ManagedLedgerName")); + String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get("ManagedLedgerName")); + String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid); + CompletableFuture promise = new CompletableFuture<>(); + if (offloads.remove(ledgerId, dataFilePath)) { + deletes.put(ledgerId, dataFilePath); + promise.complete(null); + } else { + promise.completeExceptionally(new Exception("Not found")); + } + return promise; + }; + + @Override + public OffloadPoliciesImpl getOffloadPolicies() { + return offloadPolicies; + } + + @Override + public void close() { + } + } + @Test public void testLaggedDelete() throws Exception { OffloadPrefixTest.MockLedgerOffloader offloader = new OffloadPrefixTest.MockLedgerOffloader(); @@ -105,6 +206,54 @@ public void testLaggedDelete() throws Exception { assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedgerId)); } + @Test(timeOut = 5000) + public void testFileSystemOffloadDeletePath() throws Exception { + MockFileSystemLedgerOffloader offloader = new MockFileSystemLedgerOffloader(); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + MockClock clock = new MockClock(); + config.setMaxEntriesPerLedger(10); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + config.setRetentionTime(3, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(300000L); + config.setLedgerOffloader(offloader); + config.setClock(clock); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger_filesystem", config); + int i = 0; + for (; i < 15; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); + long firstLedgerId = ledger.getLedgersInfoAsList().get(0).getLedgerId(); + + ledger.offloadPrefix(ledger.getLastConfirmedEntry()); + + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); + Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()) + .map(e -> e.getLedgerId()).collect(Collectors.toSet()), + offloader.offloadedLedgers()); + Assert.assertTrue(bkc.getLedgers().contains(firstLedgerId)); + + // ledger still exists in list + Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()) + .map(e -> e.getLedgerId()).collect(Collectors.toSet()), + offloader.offloadedLedgers()); + + // move past retention, should be deleted from offloaded also + clock.advance(5, TimeUnit.MINUTES); + CompletableFuture promise3 = new CompletableFuture<>(); + ledger.internalTrimConsumedLedgers(promise3); + promise3.join(); + + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1); + assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedgerId)); + } + @Test public void testLaggedDeleteRetentionSetLower() throws Exception { OffloadPrefixTest.MockLedgerOffloader offloader = new OffloadPrefixTest.MockLedgerOffloader();