Skip to content

Commit

Permalink
[offload] fix FileSystemManagedLedgerOffloader can not cleanup outdat…
Browse files Browse the repository at this point in the history
…ed ledger data
  • Loading branch information
frankxieke committed Oct 12, 2021
1 parent 8b50af5 commit 55f4708
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -3083,9 +3083,13 @@ private void cleanupOffloaded(long ledgerId, UUID uuid, String offloadDriverName
Map<String, String> offloadDriverMetadata, String cleanupReason) {
log.info("[{}] Cleanup offload for ledgerId {} uuid {} because of the reason {}.",
name, ledgerId, uuid.toString(), cleanupReason);
Map<String, String> metadataMap = Maps.newHashMap();
metadataMap.putAll(offloadDriverMetadata);
metadataMap.put("ManagedLedgerName", name);

Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toHours(1)).limit(10),
Retries.NonFatalPredicate,
() -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, offloadDriverMetadata),
() -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, metadataMap),
scheduledExecutor, name).whenComplete((ignored, exception) -> {
if (exception != null) {
log.warn("[{}] Error cleaning up offload for {}, (cleanup reason: {})",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@

import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
Expand All @@ -43,6 +48,102 @@
public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
private static final Logger log = LoggerFactory.getLogger(OffloadLedgerDeleteTest.class);


static class MockFileSystemLedgerOffloader implements LedgerOffloader {
interface InjectAfterOffload {
void call();
}

private String storageBasePath = "/Users/pulsar_filesystem_offloader";

private static String getStoragePath(String storageBasePath, String managedLedgerName) {
return storageBasePath == null ? managedLedgerName + "/" : storageBasePath + "/" + managedLedgerName + "/";
}

private static String getDataFilePath(String storagePath, long ledgerId, UUID uuid) {
return storagePath + ledgerId + "-" + uuid.toString();
}

ConcurrentHashMap<Long, String> offloads = new ConcurrentHashMap<Long, String>();
ConcurrentHashMap<Long, String> deletes = new ConcurrentHashMap<Long, String>();
OffloadPrefixTest.MockLedgerOffloader.InjectAfterOffload inject = null;

Set<Long> offloadedLedgers() {
return offloads.keySet();
}

Set<Long> deletedOffloads() {
return deletes.keySet();
}

OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("filesystem", "", "", "",
null, null,
null, null,
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY);

@Override
public String getOffloadDriverName() {
return "mockfilesystem";
}

@Override
public CompletableFuture<Void> offload(ReadHandle ledger,
UUID uuid,
Map<String, String> extraMetadata) {
Assert.assertNotNull(extraMetadata.get("ManagedLedgerName"));
String storagePath = getStoragePath(storageBasePath, extraMetadata.get("ManagedLedgerName"));
String dataFilePath = getDataFilePath(storagePath, ledger.getId(), uuid);
CompletableFuture<Void> promise = new CompletableFuture<>();
if (offloads.putIfAbsent(ledger.getId(), dataFilePath) == null) {
promise.complete(null);
} else {
promise.completeExceptionally(new Exception("Already exists exception"));
}

if (inject != null) {
inject.call();
}
return promise;
}

@Override
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid,
Map<String, String> offloadDriverMetadata) {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
promise.completeExceptionally(new UnsupportedOperationException());
return promise;
}

@Override
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid,
Map<String, String> offloadDriverMetadata) {
Assert.assertNotNull(offloadDriverMetadata.get("ManagedLedgerName"));
String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get("ManagedLedgerName"));
String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
CompletableFuture<Void> promise = new CompletableFuture<>();
if (offloads.remove(ledgerId, dataFilePath)) {
deletes.put(ledgerId, dataFilePath);
promise.complete(null);
} else {
promise.completeExceptionally(new Exception("Not found"));
}
return promise;
};

@Override
public OffloadPoliciesImpl getOffloadPolicies() {
return offloadPolicies;
}

@Override
public void close() {
}
}

@Test
public void testLaggedDelete() throws Exception {
OffloadPrefixTest.MockLedgerOffloader offloader = new OffloadPrefixTest.MockLedgerOffloader();
Expand Down Expand Up @@ -105,6 +206,54 @@ public void testLaggedDelete() throws Exception {
assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedgerId));
}

@Test(timeOut = 5000)
public void testFileSystemOffloadDeletePath() throws Exception {
MockFileSystemLedgerOffloader offloader = new MockFileSystemLedgerOffloader();

ManagedLedgerConfig config = new ManagedLedgerConfig();
MockClock clock = new MockClock();
config.setMaxEntriesPerLedger(10);
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(3, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(300000L);
config.setLedgerOffloader(offloader);
config.setClock(clock);

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger_filesystem", config);
int i = 0;
for (; i < 15; i++) {
String content = "entry-" + i;
ledger.addEntry(content.getBytes());
}
Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
long firstLedgerId = ledger.getLedgersInfoAsList().get(0).getLedgerId();

ledger.offloadPrefix(ledger.getLastConfirmedEntry());

Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(e -> e.getOffloadContext().getComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());
Assert.assertTrue(bkc.getLedgers().contains(firstLedgerId));

// ledger still exists in list
Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(e -> e.getOffloadContext().getComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());

// move past retention, should be deleted from offloaded also
clock.advance(5, TimeUnit.MINUTES);
CompletableFuture<Void> promise3 = new CompletableFuture<>();
ledger.internalTrimConsumedLedgers(promise3);
promise3.join();

Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1);
assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedgerId));
}

@Test
public void testLaggedDeleteRetentionSetLower() throws Exception {
OffloadPrefixTest.MockLedgerOffloader offloader = new OffloadPrefixTest.MockLedgerOffloader();
Expand Down

0 comments on commit 55f4708

Please sign in to comment.