Skip to content

Commit

Permalink
Added test to repro the snapshot chain corruption.
Browse files Browse the repository at this point in the history
  • Loading branch information
hemantk-12 committed Mar 26, 2024
1 parent e537e1e commit 759abe2
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
import org.apache.ozone.test.GenericTestUtils;
Expand All @@ -46,8 +48,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
Expand All @@ -60,8 +65,12 @@
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Test Snapshot Deleting Service.
Expand Down Expand Up @@ -101,6 +110,8 @@ public void setup() throws Exception {
conf.setBoolean(OZONE_ACL_ENABLED, true);
// Enable filesystem snapshot feature for the test regardless of the default
conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, -1, TimeUnit.MILLISECONDS);

cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
.build();
Expand Down Expand Up @@ -562,4 +573,80 @@ private boolean assertTableRowCount(int expectedCount,
});
return count.get() == expectedCount;
}

@Test
void testSnapshotDeletingAndKeyDeletingServiceSync() throws IOException, InterruptedException, TimeoutException {
KeyDeletingService keyDeletingService = cluster.getOzoneManager().getKeyManager().getDeletingService();
SnapshotDeletingService snapshotDeletingService =
cluster.getOzoneManager().getKeyManager().getSnapshotDeletingService();
OzoneManagerDoubleBuffer doubleBuffer =
cluster.getOzoneManager().getOmRatisServer().getOmStateMachine().getOzoneManagerDoubleBuffer();

List<String> snapshots = Arrays.asList("snap-a", "snap-b", "snap-c");

keyDeletingService.suspend();
createSnapshots(snapshots);
keyDeletingService.resume();
keyDeletingService.pauseSubmitToOm();
Thread.sleep(1000);

long initialKeyDeletingServiceCount = keyDeletingService.getRunCount().get();

String snapshotToDelete = snapshots.get(1);

snapshotDeletingService.suspend();
Thread.sleep(1000);
long initialSnapshotDeletingServiceCount = snapshotDeletingService.getRunCount().get();

client.getProxy().deleteSnapshot(VOLUME_NAME, BUCKET_NAME_ONE, snapshotToDelete);

Thread.sleep(1000);

// Wait till doubleBuffer flushes all the pending transactions.
GenericTestUtils.waitFor(() -> doubleBuffer.getCurrentBufferSize() == 0, 1000, 10_000);

Thread.sleep(1000);
doubleBuffer.pause();

// Validate that RocksDB has been updated.
assertEquals(SNAPSHOT_DELETED, cluster.getOzoneManager().getMetadataManager().getSnapshotInfoTable()
.getSkipCache(SnapshotInfo.getTableKey(VOLUME_NAME, BUCKET_NAME_ONE, snapshotToDelete)).getSnapshotStatus());

snapshotDeletingService.resume();
keyDeletingService.resumeSubmitToOm();

// Wait for snapshotDeletingService to run once after resuming.
GenericTestUtils.waitFor(
() -> snapshotDeletingService.getRunCount().get() > initialSnapshotDeletingServiceCount, 100, 10_000);
// Wait for keyDeletingService to run once after resuming.
GenericTestUtils.waitFor(() -> keyDeletingService.getRunCount().get() > initialKeyDeletingServiceCount,
100, 10_000);

snapshotDeletingService.suspend();
keyDeletingService.pauseSubmitToOm();

doubleBuffer.resume();

// Wait till doubleBuffer flushes all the pending transactions.
GenericTestUtils.waitFor(() -> doubleBuffer.getCurrentBufferSize() == 0, 1000, 60_000);
assertTableRowCount(cluster.getOzoneManager().getMetadataManager().getSnapshotInfoTable(), 2);
}

private List<SnapshotInfo> createSnapshots(List<String> snapshotNames)
throws IOException, InterruptedException, TimeoutException {
List<SnapshotInfo> snapshotInfos = new ArrayList<>();
for (String snapshotName : snapshotNames) {
client.getProxy().createSnapshot(VOLUME_NAME, BUCKET_NAME_ONE, snapshotName);
SnapshotInfo snapshotInfo = cluster.getOzoneManager().getMetadataManager()
.getSnapshotInfoTable()
.get(SnapshotInfo.getTableKey(VOLUME_NAME, BUCKET_NAME_ONE, snapshotName));

// Allow the snapshot to be written to disk
String fileName = getSnapshotPath(cluster.getOzoneManager().getConfiguration(), snapshotInfo);
File snapshotDir = new File(fileName);
GenericTestUtils.waitFor(snapshotDir::exists, 100, 120_000);
snapshotInfos.add(snapshotInfo);
}
return snapshotInfos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,13 @@ public synchronized void updateSnapshot(SnapshotInfo snapshotInfo) {
public synchronized boolean deleteSnapshot(SnapshotInfo snapshotInfo)
throws IOException {
validateSnapshotChain();
boolean status = deleteSnapshotGlobal(snapshotInfo.getSnapshotId()) &&
return deleteSnapshotGlobal(snapshotInfo.getSnapshotId()) &&
deleteSnapshotPath(snapshotInfo.getSnapshotPath(),
snapshotInfo.getSnapshotId());
if (status) {
snapshotIdToTableKey.remove(snapshotInfo.getSnapshotId());
}
return status;
}

public synchronized void deleteSnapshotFromSnapshotIdToTableKey(UUID snapshotId) {
snapshotIdToTableKey.remove(snapshotId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,10 @@ private void addToBatchTransactionInfoWithTrace(String parentName,
*/
@VisibleForTesting
void flushTransactions() {
while (isRunning.get() && canFlush()) {
flushCurrentBuffer();
while (canFlush()) {
if (isRunning.get()) {
flushCurrentBuffer();
}
}
}

Expand Down Expand Up @@ -608,7 +610,7 @@ long getFlushIterationsForTesting() {
return flushIterations.get();
}

int getCurrentBufferSize() {
public int getCurrentBufferSize() {
return currentBuffer.size();
}

Expand All @@ -617,8 +619,13 @@ int getReadyBufferSize() {
}

@VisibleForTesting
void resume() {
isRunning.set(true);
public void pause() {
isRunning.compareAndSet(true, false);
}

@VisibleForTesting
public void resume() {
isRunning.compareAndSet(false, true);
}

CompletableFuture<Integer> awaitFlushAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager,

// Delete Snapshot checkpoint directory.
deleteCheckpointDirectory(omMetadataManager, snapshotInfo);
omMetadataManager.getSnapshotInfoTable().deleteWithBatch(batchOperation,
dbKey);
omMetadataManager.getSnapshotInfoTable().deleteWithBatch(batchOperation, dbKey);
((OmMetadataManagerImpl) omMetadataManager).getSnapshotChainManager()
.deleteSnapshotFromSnapshotIdToTableKey(snapshotInfo.getSnapshotId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,9 @@ private void updateDeepCleanedSnapshots(List<String> deepCleanedSnapshots) {
.setSetSnapshotPropertyRequest(setSnapshotPropertyRequest)
.setClientId(clientId.toString())
.build();

submitRequest(omRequest, clientId);
if (canSubmitRequest()) {
submitRequest(omRequest, clientId);
}
}
}

Expand Down Expand Up @@ -517,4 +518,26 @@ public void submitRequest(OMRequest omRequest, ClientId clientId) {
}
}
}

private final AtomicBoolean submitToOm = new AtomicBoolean(true);

public boolean canSubmitRequest() {
while (!submitToOm.get()) {
try {
Thread.sleep(1000L);
} catch (InterruptedException exception) {
throw new RuntimeException(exception);
}
}
return true;
}

public void pauseSubmitToOm() {
submitToOm.compareAndSet(true, false);
}

public void resumeSubmitToOm() {
submitToOm.compareAndSet(false, true);

}
}

0 comments on commit 759abe2

Please sign in to comment.