Skip to content

Commit

Permalink
HDDS-9198. Changed snapshot purge to single purge instead of batch purge
Browse files Browse the repository at this point in the history
  • Loading branch information
hemantk-12 committed Aug 1, 2024
1 parent 5118f23 commit 24b6849
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1937,8 +1937,9 @@ message SnapshotMoveKeyInfos {
}

message SnapshotPurgeRequest {
repeated string snapshotDBKeys = 1;
repeated string snapshotDBKeys = 1 [deprecated = true];
repeated string updatedSnapshotDBKey = 2 [deprecated = true];
optional string snapshotKey = 3;
}

message SetSnapshotPropertyRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

package org.apache.hadoop.ozone.om.request.snapshot;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotPurgeResponse.OmPurgeResponse;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
Expand All @@ -43,11 +46,11 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;

Expand All @@ -71,103 +74,124 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn

final long trxnLogIndex = termIndex.getIndex();

OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager();
OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl)
ozoneManager.getMetadataManager();
SnapshotChainManager snapshotChainManager =
omMetadataManager.getSnapshotChainManager();

OMClientResponse omClientResponse = null;
OMClientResponse omClientResponse;

OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
OmResponseUtil.getOMResponseBuilder(getOmRequest());
SnapshotPurgeRequest snapshotPurgeRequest = getOmRequest()
.getSnapshotPurgeRequest();

try {
List<String> snapshotDbKeys = snapshotPurgeRequest
.getSnapshotDBKeysList();
Map<String, SnapshotInfo> updatedSnapInfos = new HashMap<>();
Map<String, SnapshotInfo> updatedPathPreviousAndGlobalSnapshots =
new HashMap<>();

// Each snapshot purge operation does three things:
// 1. Update the snapshot chain,
// 2. Update the deep clean flag for the next active snapshot (So that it can be
// deep cleaned by the KeyDeletingService in the next run),
// 3. Finally, purge the snapshot.
// All of these steps have to be performed only when it acquires all the necessary
// locks (lock on the snapshot to be purged, lock on the next active snapshot, and
// lock on the next path and global previous snapshots). Ideally, there is no need
// for locks for snapshot purge and can rely on OMStateMachine because OMStateMachine
// is going to process each request sequentially.
//
// But there is a problem with that. After filtering unnecessary SST files for a snapshot,
// SstFilteringService updates that snapshot's SstFilter flag. SstFilteringService cannot
// use SetSnapshotProperty API because it runs on each OM independently and One OM does
// not know if the snapshot has been filtered on the other OM in HA environment.
//
// If locks are not taken snapshot purge and SstFilteringService will cause a race condition
// and override one's update with another.
for (String snapTableKey : snapshotDbKeys) {
// To acquire all the locks, a set is maintained which is keyed by snapshotTableKey.
// snapshotTableKey is nothing but /volumeName/bucketName/snapshotName.
// Once all the locks are acquired, it performs the three steps mentioned above and
// release all the locks after that.
Set<Triple<String, String, String>> lockSet = new HashSet<>(4, 1);
try {
if (omMetadataManager.getSnapshotInfoTable().get(snapTableKey) == null) {
// Snapshot may have been purged in the previous iteration of SnapshotDeletingService.
LOG.warn("The snapshot {} is not longer in snapshot table, It maybe removed in the previous " +
"Snapshot purge request.", snapTableKey);
continue;
}

acquireLock(lockSet, snapTableKey, omMetadataManager);
SnapshotInfo fromSnapshot = omMetadataManager.getSnapshotInfoTable().get(snapTableKey);

SnapshotInfo nextSnapshot =
SnapshotUtils.getNextActiveSnapshot(fromSnapshot, snapshotChainManager, omSnapshotManager);

if (nextSnapshot != null) {
acquireLock(lockSet, nextSnapshot.getTableKey(), omMetadataManager);
}

// Update the chain first so that it has all the necessary locks before updating deep clean.
updateSnapshotChainAndCache(lockSet, omMetadataManager, fromSnapshot, trxnLogIndex,
updatedPathPreviousAndGlobalSnapshots);
updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager, trxnLogIndex, updatedSnapInfos);
// Remove and close snapshot's RocksDB instance from SnapshotCache.
omSnapshotManager.invalidateCacheEntry(fromSnapshot.getSnapshotId());
// Update SnapshotInfoTable cache.
omMetadataManager.getSnapshotInfoTable()
.addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()), CacheValue.get(trxnLogIndex));
} finally {
for (Triple<String, String, String> lockKey: lockSet) {
omMetadataManager.getLock()
.releaseWriteLock(SNAPSHOT_LOCK, lockKey.getLeft(), lockKey.getMiddle(), lockKey.getRight());
}
// This is for backward compatibility. If ratis has batch purge transactions which are replayed
// either during service upgraded or later for any other reason when OM has been upgraded to non-batch.
if (!snapshotPurgeRequest.getSnapshotDBKeysList().isEmpty()) {
List<OmPurgeResponse> omPurgeResponses = new ArrayList<>();
for (String snapshotKey : snapshotPurgeRequest.getSnapshotDBKeysList()) {
OmPurgeResponse omPurgeResponse = purgeSnapshot(snapshotKey, ozoneManager, trxnLogIndex);
omPurgeResponses.add(omPurgeResponse);
}
omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(), omPurgeResponses);
LOG.info("OmPurgeResponse list: {}.", omPurgeResponses);
} else if (snapshotPurgeRequest.hasSnapshotKey()) {
OmPurgeResponse omPurgeResponse =
purgeSnapshot(snapshotPurgeRequest.getSnapshotKey(), ozoneManager, trxnLogIndex);
omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(), omPurgeResponse);
LOG.info("OmPurgeResponse: {}.", omPurgeResponse);
} else {
throw new OMException("Both snapshotDBKeysList and snapshotKey are null." +
" One of them is required for snapshot purge operations.", OMException.ResultCodes.INVALID_REQUEST);
}

omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(),
snapshotDbKeys, updatedSnapInfos,
updatedPathPreviousAndGlobalSnapshots);

omMetrics.incNumSnapshotPurges();
LOG.info("Successfully executed snapshotPurgeRequest: {{}} along with updating deep clean flags for " +
"snapshots: {} and global and previous for snapshots:{}.",
snapshotPurgeRequest, updatedSnapInfos.keySet(), updatedPathPreviousAndGlobalSnapshots.keySet());
LOG.debug("Successfully executed snapshotPurgeRequest: {{}}.", snapshotPurgeRequest);
} catch (IOException ex) {
omClientResponse = new OMSnapshotPurgeResponse(
createErrorOMResponse(omResponse, ex));
omClientResponse = new OMSnapshotPurgeResponse(createErrorOMResponse(omResponse, ex));
omMetrics.incNumSnapshotPurgeFails();
LOG.error("Failed to execute snapshotPurgeRequest:{{}}.", snapshotPurgeRequest, ex);
}

return omClientResponse;
}

// Each snapshot purge operation does three things:
// 1. Update the snapshot chain,
// 2. Update the deep clean flag for the next active snapshot (So that it can be
// deep cleaned by the KeyDeletingService in the next run),
// 3. Finally, purge the snapshot.
// All of these steps have to be performed only when it acquires all the necessary
// locks (lock on the snapshot to be purged, lock on the next active snapshot, and
// lock on the next path and global previous snapshots). Ideally, there is no need
// for locks for snapshot purge and can rely on OMStateMachine because OMStateMachine
// is going to process each request sequentially.
//
// But there is a problem with that. After filtering unnecessary SST files for a snapshot,
// SstFilteringService updates that snapshot's SstFilter flag. SstFilteringService cannot
// use SetSnapshotProperty API because it runs on each OM independently and One OM does
// not know if the snapshot has been filtered on the other OM in HA environment.
//
// If locks are not taken snapshot purge and SstFilteringService will cause a race condition
// and override one's update with another.
private OmPurgeResponse purgeSnapshot(String snapshotKey,
OzoneManager ozoneManager,
long trxnLogIndex) throws IOException {

OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager();
OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) ozoneManager.getMetadataManager();
SnapshotChainManager snapshotChainManager = omMetadataManager.getSnapshotChainManager();

if (omMetadataManager.getSnapshotInfoTable().get(snapshotKey) == null) {
// Snapshot may have been purged in the previous iteration of SnapshotDeletingService.
throw new OMException("Snapshot: '" + snapshotKey + "}' is no longer exist " +
"in snapshot table. Might be removed in previous run.", OMException.ResultCodes.FILE_NOT_FOUND);
}

// To acquire all the locks, a set is maintained which is keyed by a triple of volumeName, bucketName and
// snapshotName. SnapshotInfoTable key (which is /volumeName/bucketName/snapshotName) is not directly
// because volumeName, bucketName and snapshotName can't be obtained after purging snapshot from cache.
// Once all the necessary locks are acquired, the three steps mentioned above are performed and
// locks are release after that.
Set<Triple<String, String, String>> lockSet = new HashSet<>(4, 1);

try {
acquireLock(lockSet, snapshotKey, omMetadataManager);

SnapshotInfo fromSnapshot = omMetadataManager.getSnapshotInfoTable().get(snapshotKey);
SnapshotInfo nextSnapshot =
SnapshotUtils.getNextActiveSnapshot(fromSnapshot, snapshotChainManager, omSnapshotManager);

if (nextSnapshot != null) {
acquireLock(lockSet, nextSnapshot.getTableKey(), omMetadataManager);
}

// Step 1: Update the snapshot chain.
Pair<SnapshotInfo, SnapshotInfo> pathToGlobalSnapshotInto =
updateSnapshotChainAndCache(lockSet, omMetadataManager, fromSnapshot, trxnLogIndex);
SnapshotInfo nextPathSnapshotInfo = null;
SnapshotInfo nextGlobalSnapshotInfo = null;

if (pathToGlobalSnapshotInto != null) {
nextPathSnapshotInfo = pathToGlobalSnapshotInto.getLeft();
nextGlobalSnapshotInfo = pathToGlobalSnapshotInto.getRight();
}

// Step 2: Update the deep clean flag for the next active snapshot
SnapshotInfo nextActiveSnapshotInfo = updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager, trxnLogIndex);

// Remove and close snapshot's RocksDB instance from SnapshotCache.
ozoneManager.getOmSnapshotManager().invalidateCacheEntry(fromSnapshot.getSnapshotId());

// Step 3: Purge the snapshot from SnapshotInfoTable cache.
ozoneManager.getMetadataManager().getSnapshotInfoTable()
.addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()), CacheValue.get(trxnLogIndex));

return new OmPurgeResponse(snapshotKey, nextPathSnapshotInfo, nextGlobalSnapshotInfo,
nextActiveSnapshotInfo);
} finally {
lockSet.forEach(lockKey -> omMetadataManager.getLock()
.releaseWriteLock(SNAPSHOT_LOCK, lockKey.getLeft(), lockKey.getMiddle(), lockKey.getRight()));
}
}

private void acquireLock(Set<Triple<String, String, String>> lockSet, String snapshotTableKey,
OMMetadataManager omMetadataManager) throws IOException {
SnapshotInfo snapshotInfo = omMetadataManager.getSnapshotInfoTable().get(snapshotTableKey);
Expand All @@ -187,23 +211,25 @@ private void acquireLock(Set<Triple<String, String, String>> lockSet, String sna
}
}

private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo,
OmMetadataManagerImpl omMetadataManager, long trxnLogIndex,
Map<String, SnapshotInfo> updatedSnapInfos) throws IOException {
if (snapInfo != null) {
// Fetch the latest value again after acquiring lock.
SnapshotInfo updatedSnapshotInfo = omMetadataManager.getSnapshotInfoTable().get(snapInfo.getTableKey());

// Setting next snapshot deep clean to false, Since the
// current snapshot is deleted. We can potentially
// reclaim more keys in the next snapshot.
updatedSnapshotInfo.setDeepClean(false);

// Update table cache first
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(updatedSnapshotInfo.getTableKey()),
CacheValue.get(trxnLogIndex, updatedSnapshotInfo));
updatedSnapInfos.put(updatedSnapshotInfo.getTableKey(), updatedSnapshotInfo);
private SnapshotInfo updateSnapshotInfoAndCache(SnapshotInfo snapInfo, OmMetadataManagerImpl omMetadataManager,
long trxnLogIndex) throws IOException {
if (snapInfo == null) {
return null;
}

// Fetch the latest value again after acquiring lock.
SnapshotInfo updatedSnapshotInfo = omMetadataManager.getSnapshotInfoTable().get(snapInfo.getTableKey());

// Setting next snapshot deep clean to false, Since the
// current snapshot is deleted. We can potentially
// reclaim more keys in the next snapshot.
updatedSnapshotInfo.setDeepClean(false);

// Update table cache first
omMetadataManager.getSnapshotInfoTable()
.addCacheEntry(new CacheKey<>(updatedSnapshotInfo.getTableKey()),
CacheValue.get(trxnLogIndex, updatedSnapshotInfo));
return updatedSnapshotInfo;
}

/**
Expand All @@ -212,19 +238,14 @@ private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo,
* It also returns the pair of updated next path and global snapshots to
* update in DB.
*/
private void updateSnapshotChainAndCache(
private Pair<SnapshotInfo, SnapshotInfo> updateSnapshotChainAndCache(
Set<Triple<String, String, String>> lockSet,
OmMetadataManagerImpl metadataManager,
SnapshotInfo snapInfo,
long trxnLogIndex,
Map<String, SnapshotInfo> updatedPathPreviousAndGlobalSnapshots
long trxnLogIndex
) throws IOException {
if (snapInfo == null) {
return;
}

SnapshotChainManager snapshotChainManager = metadataManager
.getSnapshotChainManager();
Table<String, SnapshotInfo> snapshotInfoTable = metadataManager.getSnapshotInfoTable();
SnapshotChainManager snapshotChainManager = metadataManager.getSnapshotChainManager();

// If the snapshot is deleted in the previous run, then the in-memory
// SnapshotChainManager might throw NoSuchElementException as the snapshot
Expand All @@ -237,16 +258,15 @@ private void updateSnapshotChainAndCache(
hasNextGlobalSnapshot = snapshotChainManager.hasNextGlobalSnapshot(
snapInfo.getSnapshotId());
} catch (NoSuchElementException ex) {
return;
return null;
}

String nextPathSnapshotKey = null;

if (hasNextPathSnapshot) {
UUID nextPathSnapshotId = snapshotChainManager.nextPathSnapshot(
snapInfo.getSnapshotPath(), snapInfo.getSnapshotId());
nextPathSnapshotKey = snapshotChainManager
.getTableKey(nextPathSnapshotId);
nextPathSnapshotKey = snapshotChainManager.getTableKey(nextPathSnapshotId);

// Acquire lock from the snapshot
acquireLock(lockSet, nextPathSnapshotKey, metadataManager);
Expand All @@ -262,43 +282,35 @@ private void updateSnapshotChainAndCache(
}

SnapshotInfo nextPathSnapInfo =
nextPathSnapshotKey != null ? metadataManager.getSnapshotInfoTable().get(nextPathSnapshotKey) : null;
nextPathSnapshotKey != null ? snapshotInfoTable.get(nextPathSnapshotKey) : null;

SnapshotInfo nextGlobalSnapInfo =
nextGlobalSnapshotKey != null ? metadataManager.getSnapshotInfoTable().get(nextGlobalSnapshotKey) : null;
nextGlobalSnapshotKey != null ? snapshotInfoTable.get(nextGlobalSnapshotKey) : null;

// Updates next path snapshot's previous snapshot ID
if (nextPathSnapInfo != null) {
nextPathSnapInfo.setPathPreviousSnapshotId(snapInfo.getPathPreviousSnapshotId());
metadataManager.getSnapshotInfoTable().addCacheEntry(
new CacheKey<>(nextPathSnapInfo.getTableKey()),
snapshotInfoTable.addCacheEntry(new CacheKey<>(nextPathSnapInfo.getTableKey()),
CacheValue.get(trxnLogIndex, nextPathSnapInfo));
updatedPathPreviousAndGlobalSnapshots
.put(nextPathSnapInfo.getTableKey(), nextPathSnapInfo);
}

// Updates next global snapshot's previous snapshot ID
// If both next global and path snapshot are same, it may overwrite
// nextPathSnapInfo.setPathPreviousSnapshotID(), adding this check
// will prevent it.
if (nextGlobalSnapInfo != null && nextPathSnapInfo != null &&
nextGlobalSnapInfo.getSnapshotId().equals(nextPathSnapInfo.getSnapshotId())) {
Objects.equals(nextGlobalSnapInfo.getSnapshotId(), nextPathSnapInfo.getSnapshotId())) {
nextPathSnapInfo.setGlobalPreviousSnapshotId(snapInfo.getGlobalPreviousSnapshotId());
metadataManager.getSnapshotInfoTable().addCacheEntry(
new CacheKey<>(nextPathSnapInfo.getTableKey()),
snapshotInfoTable.addCacheEntry(new CacheKey<>(nextPathSnapInfo.getTableKey()),
CacheValue.get(trxnLogIndex, nextPathSnapInfo));
updatedPathPreviousAndGlobalSnapshots
.put(nextPathSnapInfo.getTableKey(), nextPathSnapInfo);
nextGlobalSnapInfo = nextPathSnapInfo;
} else if (nextGlobalSnapInfo != null) {
nextGlobalSnapInfo.setGlobalPreviousSnapshotId(
snapInfo.getGlobalPreviousSnapshotId());
metadataManager.getSnapshotInfoTable().addCacheEntry(
new CacheKey<>(nextGlobalSnapInfo.getTableKey()),
nextGlobalSnapInfo.setGlobalPreviousSnapshotId(snapInfo.getGlobalPreviousSnapshotId());
snapshotInfoTable.addCacheEntry(new CacheKey<>(nextGlobalSnapInfo.getTableKey()),
CacheValue.get(trxnLogIndex, nextGlobalSnapInfo));
updatedPathPreviousAndGlobalSnapshots
.put(nextGlobalSnapInfo.getTableKey(), nextGlobalSnapInfo);
}

snapshotChainManager.deleteSnapshot(snapInfo);
return Pair.of(nextPathSnapInfo, nextGlobalSnapInfo);
}
}
Loading

0 comments on commit 24b6849

Please sign in to comment.