Skip to content

Commit

Permalink
HDDS-11068. Move SstFiltered flag to a file in the snapshot directory (
Browse files Browse the repository at this point in the history
  • Loading branch information
swamirishi authored and devabhishekpal committed Aug 8, 2024
1 parent c0b5483 commit 9c4bf8b
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.ozone.om.OmFailoverProxyUtil;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SstFilteringService;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
Expand Down Expand Up @@ -572,7 +573,7 @@ private void checkIfSnapshotGetsProcessedBySFS(OzoneManager ozoneManager)
} catch (IOException e) {
fail();
}
return snapshotInfo.isSstFiltered();
return SstFilteringService.isSstFiltered(ozoneManager.getConfiguration(), snapshotInfo);
}, 1000, 10000);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
import org.apache.hadoop.ozone.snapshot.ListSnapshotResponse;
import org.apache.hadoop.ozone.storage.proto.
OzoneManagerStorageProtos.PersistedUserVolumeInfo;
import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,13 @@ public static String getSnapshotPrefix(String snapshotName) {
snapshotName + OM_KEY_PREFIX;
}

public static Path getSnapshotPath(OMMetadataManager omMetadataManager, SnapshotInfo snapshotInfo) {
RDBStore store = (RDBStore) omMetadataManager.getStore();
String checkpointPrefix = store.getDbLocation().getName();
return Paths.get(store.getSnapshotsParentDir(),
checkpointPrefix + snapshotInfo.getCheckpointDir());
}

public static String getSnapshotPath(OzoneConfiguration conf,
SnapshotInfo snapshotInfo) {
return OMStorage.getOmDbDir(conf) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.om;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
Expand All @@ -38,6 +39,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -69,6 +73,10 @@ public class SstFilteringService extends BackgroundService
// multiple times.
private static final int SST_FILTERING_CORE_POOL_SIZE = 1;

public static final String SST_FILTERED_FILE = "sstFiltered";
private static final byte[] SST_FILTERED_FILE_CONTENT = StringUtils.string2Bytes("This file holds information " +
"if a particular snapshot has filtered out the relevant sst files or not.\nDO NOT add, change or delete " +
"any files in this directory unless you know what you are doing.\n");
private final OzoneManager ozoneManager;

// Number of files to be batched in an iteration.
Expand All @@ -78,6 +86,12 @@ public class SstFilteringService extends BackgroundService

private AtomicBoolean running;

public static boolean isSstFiltered(OzoneConfiguration ozoneConfiguration, SnapshotInfo snapshotInfo) {
Path sstFilteredFile = Paths.get(OmSnapshotManager.getSnapshotPath(ozoneConfiguration,
snapshotInfo), SST_FILTERED_FILE);
return snapshotInfo.isSstFiltered() || sstFilteredFile.toFile().exists();
}

public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
OzoneManager ozoneManager, OzoneConfiguration configuration) {
super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
Expand Down Expand Up @@ -112,33 +126,35 @@ public void resume() {

private class SstFilteringTask implements BackgroundTask {

private boolean isSnapshotDeleted(SnapshotInfo snapshotInfo) {
return snapshotInfo == null || snapshotInfo.getSnapshotStatus() == SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED;
}


/**
* Marks the SSTFiltered flag corresponding to the snapshot.
* @param volume Volume name of the snapshot
* @param bucket Bucket name of the snapshot
* @param snapshotName Snapshot name
* Marks the snapshot as SSTFiltered by creating a file in snapshot directory.
* @param snapshotInfo snapshotInfo
* @throws IOException
*/
private void markSSTFilteredFlagForSnapshot(String volume, String bucket,
String snapshotName) throws IOException {
private void markSSTFilteredFlagForSnapshot(SnapshotInfo snapshotInfo) throws IOException {
// Acquiring read lock to avoid race condition with the snapshot directory deletion occurring
// in OmSnapshotPurgeResponse. Any operation apart from delete can run in parallel along with this operation.
//TODO. Revisit other SNAPSHOT_LOCK and see if we can change write locks to read locks to further optimize it.
OMLockDetails omLockDetails = ozoneManager.getMetadataManager().getLock()
.acquireWriteLock(SNAPSHOT_LOCK, volume, bucket, snapshotName);
.acquireReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(),
snapshotInfo.getName());
boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
if (acquiredSnapshotLock) {
Table<String, SnapshotInfo> snapshotInfoTable =
ozoneManager.getMetadataManager().getSnapshotInfoTable();
String snapshotDir = OmSnapshotManager.getSnapshotPath(ozoneManager.getConfiguration(), snapshotInfo);
try {
// mark the snapshot as filtered by writing to the file
String snapshotTableKey = SnapshotInfo.getTableKey(volume, bucket,
snapshotName);
SnapshotInfo snapshotInfo = snapshotInfoTable.get(snapshotTableKey);

snapshotInfo.setSstFiltered(true);
snapshotInfoTable.put(snapshotTableKey, snapshotInfo);
// mark the snapshot as filtered by creating a file.
if (Files.exists(Paths.get(snapshotDir))) {
Files.write(Paths.get(snapshotDir, SST_FILTERED_FILE), SST_FILTERED_FILE_CONTENT);
}
} finally {
ozoneManager.getMetadataManager().getLock()
.releaseWriteLock(SNAPSHOT_LOCK, volume, bucket, snapshotName);
.releaseReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(), snapshotInfo.getName());
}
}
}
Expand All @@ -163,12 +179,11 @@ public BackgroundTaskResult call() throws Exception {
long snapshotLimit = snapshotLimitPerTask;

while (iterator.hasNext() && snapshotLimit > 0 && running.get()) {
Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
String snapShotTableKey = keyValue.getKey();
SnapshotInfo snapshotInfo = keyValue.getValue();
try {
Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
String snapShotTableKey = keyValue.getKey();
SnapshotInfo snapshotInfo = keyValue.getValue();

if (snapshotInfo.isSstFiltered()) {
if (isSstFiltered(ozoneManager.getConfiguration(), snapshotInfo)) {
continue;
}

Expand All @@ -194,6 +209,9 @@ public BackgroundTaskResult call() throws Exception {
.lock()) {
db.deleteFilesNotMatchingPrefix(columnFamilyNameToPrefixMap);
}
markSSTFilteredFlagForSnapshot(snapshotInfo);
snapshotLimit--;
snapshotFilteredCount.getAndIncrement();
} catch (OMException ome) {
// FILE_NOT_FOUND is obtained when the snapshot is deleted
// In this case, get the snapshotInfo from the db, check if
Expand All @@ -202,20 +220,22 @@ public BackgroundTaskResult call() throws Exception {
SnapshotInfo snapshotInfoToCheck =
ozoneManager.getMetadataManager().getSnapshotInfoTable()
.get(snapShotTableKey);
if (snapshotInfoToCheck.getSnapshotStatus() ==
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) {
if (isSnapshotDeleted(snapshotInfoToCheck)) {
LOG.info("Snapshot with name: '{}', id: '{}' has been " +
"deleted.", snapshotInfo.getName(), snapshotInfo
.getSnapshotId());
}
}
}
markSSTFilteredFlagForSnapshot(snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(), snapshotInfo.getName());
snapshotLimit--;
snapshotFilteredCount.getAndIncrement();
} catch (RocksDBException | IOException e) {
LOG.error("Exception encountered while filtering a snapshot", e);
if (isSnapshotDeleted(snapshotInfoTable.get(snapShotTableKey))) {
LOG.info("Exception encountered while filtering a snapshot: {} since it was deleted midway",
snapShotTableKey, e);
} else {
LOG.error("Exception encountered while filtering a snapshot", e);
}


}
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
Expand All @@ -33,11 +34,11 @@
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;

import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.SNAPSHOT_LOCK;

/**
* Response for OMSnapshotPurgeRequest.
Expand Down Expand Up @@ -116,15 +117,24 @@ private void updateSnapInfo(OmMetadataManagerImpl metadataManager,
*/
private void deleteCheckpointDirectory(OMMetadataManager omMetadataManager,
SnapshotInfo snapshotInfo) {
RDBStore store = (RDBStore) omMetadataManager.getStore();
String checkpointPrefix = store.getDbLocation().getName();
Path snapshotDirPath = Paths.get(store.getSnapshotsParentDir(),
checkpointPrefix + snapshotInfo.getCheckpointDir());
try {
FileUtils.deleteDirectory(snapshotDirPath.toFile());
} catch (IOException ex) {
LOG.error("Failed to delete snapshot directory {} for snapshot {}",
snapshotDirPath, snapshotInfo.getTableKey(), ex);
// Acquiring write lock to avoid race condition with sst filtering service which creates a sst filtered file
// inside the snapshot directory. Any operation apart which doesn't create/delete files under this snapshot
// directory can run in parallel along with this operation.
OMLockDetails omLockDetails = omMetadataManager.getLock()
.acquireWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(),
snapshotInfo.getName());
boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
if (acquiredSnapshotLock) {
Path snapshotDirPath = OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotInfo);
try {
FileUtils.deleteDirectory(snapshotDirPath.toFile());
} catch (IOException ex) {
LOG.error("Failed to delete snapshot directory {} for snapshot {}",
snapshotDirPath, snapshotInfo.getTableKey(), ex);
} finally {
omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(), snapshotInfo.getName());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.KeyManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
Expand Down Expand Up @@ -100,7 +99,6 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
private final long snapshotDeletionPerTask;
private final int keyLimitPerSnapshot;
private final int ratisByteLimit;
private final boolean isSstFilteringServiceEnabled;

public SnapshotDeletingService(long interval, long serviceTimeout,
OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient)
Expand Down Expand Up @@ -128,8 +126,6 @@ public SnapshotDeletingService(long interval, long serviceTimeout,
this.keyLimitPerSnapshot = conf.getInt(
OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK,
OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);

this.isSstFilteringServiceEnabled = ((KeyManagerImpl) ozoneManager.getKeyManager()).isSstFilteringSvcEnabled();
}

private class SnapshotDeletingTask implements BackgroundTask {
Expand Down Expand Up @@ -594,8 +590,7 @@ public void submitRequest(OMRequest omRequest) {
@VisibleForTesting
boolean shouldIgnoreSnapshot(SnapshotInfo snapInfo) {
SnapshotInfo.SnapshotStatus snapshotStatus = snapInfo.getSnapshotStatus();
return snapshotStatus != SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED
|| (isSstFilteringServiceEnabled && !snapInfo.isSstFiltered());
return snapshotStatus != SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED;
}

// TODO: Move this util class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,23 @@ private static Stream<Arguments> testCasesForIgnoreSnapshotGc() {
SnapshotInfo filteredSnapshot = SnapshotInfo.newBuilder().setSstFiltered(true).setName("snap1").build();
SnapshotInfo unFilteredSnapshot = SnapshotInfo.newBuilder().setSstFiltered(false).setName("snap1").build();
return Stream.of(
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, true, false),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true, true),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, true, true),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true, true),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, false, true),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, false, true));
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true));
}

@ParameterizedTest
@MethodSource("testCasesForIgnoreSnapshotGc")
public void testProcessSnapshotLogicInSDS(SnapshotInfo snapshotInfo,
SnapshotInfo.SnapshotStatus status,
boolean sstFilteringServiceEnabled,
boolean expectedOutcome)
throws IOException {
Mockito.when(keyManager.isSstFilteringSvcEnabled()).thenReturn(sstFilteringServiceEnabled);
Mockito.when(omMetadataManager.getSnapshotChainManager()).thenReturn(chainManager);
Mockito.when(ozoneManager.getKeyManager()).thenReturn(keyManager);
Mockito.when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
Mockito.when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
Mockito.when(ozoneManager.getConfiguration()).thenReturn(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void testIrrelevantSstFileDeletion()
createSnapshot(volumeName, bucketName2, snapshotName1);
SnapshotInfo snapshotInfo = om.getMetadataManager().getSnapshotInfoTable()
.get(SnapshotInfo.getTableKey(volumeName, bucketName2, snapshotName1));
assertFalse(snapshotInfo.isSstFiltered());
assertFalse(SstFilteringService.isSstFiltered(om.getConfiguration(), snapshotInfo));
waitForSnapshotsAtLeast(filteringService, countExistingSnapshots + 1);
assertEquals(countExistingSnapshots + 1, filteringService.getSnapshotFilteredCount().get());

Expand Down Expand Up @@ -238,8 +238,9 @@ public void testIrrelevantSstFileDeletion()

// Need to read the sstFiltered flag which is set in background process and
// hence snapshotInfo.isSstFiltered() may not work sometimes.
assertTrue(om.getMetadataManager().getSnapshotInfoTable().get(SnapshotInfo
.getTableKey(volumeName, bucketName2, snapshotName1)).isSstFiltered());
assertTrue(SstFilteringService.isSstFiltered(om.getConfiguration(),
om.getMetadataManager().getSnapshotInfoTable().get(SnapshotInfo
.getTableKey(volumeName, bucketName2, snapshotName1))));

String snapshotName2 = "snapshot2";
final long count;
Expand Down Expand Up @@ -313,7 +314,7 @@ public void testActiveAndDeletedSnapshotCleanup() throws Exception {
.filter(f -> f.getName().endsWith(SST_FILE_EXTENSION)).count();

// delete snap1
writeClient.deleteSnapshot(volumeName, bucketNames.get(0), "snap1");
deleteSnapshot(volumeName, bucketNames.get(0), "snap1");
sstFilteringService.resume();
// Filtering service will only act on snap2 as it is an active snaphot
waitForSnapshotsAtLeast(sstFilteringService, countTotalSnapshots);
Expand Down Expand Up @@ -505,4 +506,9 @@ private void createSnapshot(String volumeName, String bucketName, String snapsho
writeClient.createSnapshot(volumeName, bucketName, snapshotName);
countTotalSnapshots++;
}

private void deleteSnapshot(String volumeName, String bucketName, String snapshotName) throws IOException {
writeClient.deleteSnapshot(volumeName, bucketName, snapshotName);
countTotalSnapshots--;
}
}

0 comments on commit 9c4bf8b

Please sign in to comment.