From 24b6849518f5d4a1730a065410648eed94947e86 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Thu, 1 Aug 2024 13:23:12 -0700 Subject: [PATCH] HDDS-9198. Changed snapshot purge to single purge instead of batch purge --- .../src/main/proto/OmClientProtocol.proto | 3 +- .../snapshot/OMSnapshotPurgeRequest.java | 262 +++++++++--------- .../snapshot/OMSnapshotPurgeResponse.java | 133 ++++++--- .../om/service/SnapshotDeletingService.java | 4 +- ...TestOMSnapshotPurgeRequestAndResponse.java | 97 +++++-- 5 files changed, 304 insertions(+), 195 deletions(-) diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 86403439041..f704ddaf47e 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -1937,8 +1937,9 @@ message SnapshotMoveKeyInfos { } message SnapshotPurgeRequest { - repeated string snapshotDBKeys = 1; + repeated string snapshotDBKeys = 1 [deprecated = true]; repeated string updatedSnapshotDBKey = 2 [deprecated = true]; + optional string snapshotKey = 3; } message SetSnapshotPropertyRequest { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java index 29c7628e3cc..caf55296e36 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java @@ -19,10 +19,13 @@ package org.apache.hadoop.ozone.om.request.snapshot; +import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; +import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OMMetrics; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotPurgeResponse.OmPurgeResponse; import org.apache.ratis.server.protocol.TermIndex; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; @@ -43,11 +46,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashMap; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -71,13 +74,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn final long trxnLogIndex = termIndex.getIndex(); - OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager(); - OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) - ozoneManager.getMetadataManager(); - SnapshotChainManager snapshotChainManager = - omMetadataManager.getSnapshotChainManager(); - - OMClientResponse omClientResponse = null; + OMClientResponse omClientResponse; OzoneManagerProtocolProtos.OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(getOmRequest()); @@ -85,82 +82,30 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn .getSnapshotPurgeRequest(); try { - List snapshotDbKeys = snapshotPurgeRequest - .getSnapshotDBKeysList(); - Map updatedSnapInfos = new HashMap<>(); - Map updatedPathPreviousAndGlobalSnapshots = - new HashMap<>(); - - // Each snapshot purge operation does three things: - // 1. Update the snapshot chain, - // 2. Update the deep clean flag for the next active snapshot (So that it can be - // deep cleaned by the KeyDeletingService in the next run), - // 3. Finally, purge the snapshot. - // All of these steps have to be performed only when it acquires all the necessary - // locks (lock on the snapshot to be purged, lock on the next active snapshot, and - // lock on the next path and global previous snapshots). Ideally, there is no need - // for locks for snapshot purge and can rely on OMStateMachine because OMStateMachine - // is going to process each request sequentially. - // - // But there is a problem with that. After filtering unnecessary SST files for a snapshot, - // SstFilteringService updates that snapshot's SstFilter flag. SstFilteringService cannot - // use SetSnapshotProperty API because it runs on each OM independently and One OM does - // not know if the snapshot has been filtered on the other OM in HA environment. - // - // If locks are not taken snapshot purge and SstFilteringService will cause a race condition - // and override one's update with another. - for (String snapTableKey : snapshotDbKeys) { - // To acquire all the locks, a set is maintained which is keyed by snapshotTableKey. - // snapshotTableKey is nothing but /volumeName/bucketName/snapshotName. - // Once all the locks are acquired, it performs the three steps mentioned above and - // release all the locks after that. - Set> lockSet = new HashSet<>(4, 1); - try { - if (omMetadataManager.getSnapshotInfoTable().get(snapTableKey) == null) { - // Snapshot may have been purged in the previous iteration of SnapshotDeletingService. - LOG.warn("The snapshot {} is not longer in snapshot table, It maybe removed in the previous " + - "Snapshot purge request.", snapTableKey); - continue; - } - - acquireLock(lockSet, snapTableKey, omMetadataManager); - SnapshotInfo fromSnapshot = omMetadataManager.getSnapshotInfoTable().get(snapTableKey); - - SnapshotInfo nextSnapshot = - SnapshotUtils.getNextActiveSnapshot(fromSnapshot, snapshotChainManager, omSnapshotManager); - - if (nextSnapshot != null) { - acquireLock(lockSet, nextSnapshot.getTableKey(), omMetadataManager); - } - - // Update the chain first so that it has all the necessary locks before updating deep clean. - updateSnapshotChainAndCache(lockSet, omMetadataManager, fromSnapshot, trxnLogIndex, - updatedPathPreviousAndGlobalSnapshots); - updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager, trxnLogIndex, updatedSnapInfos); - // Remove and close snapshot's RocksDB instance from SnapshotCache. - omSnapshotManager.invalidateCacheEntry(fromSnapshot.getSnapshotId()); - // Update SnapshotInfoTable cache. - omMetadataManager.getSnapshotInfoTable() - .addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()), CacheValue.get(trxnLogIndex)); - } finally { - for (Triple lockKey: lockSet) { - omMetadataManager.getLock() - .releaseWriteLock(SNAPSHOT_LOCK, lockKey.getLeft(), lockKey.getMiddle(), lockKey.getRight()); - } + // This is for backward compatibility. If ratis has batch purge transactions which are replayed + // either during service upgraded or later for any other reason when OM has been upgraded to non-batch. + if (!snapshotPurgeRequest.getSnapshotDBKeysList().isEmpty()) { + List omPurgeResponses = new ArrayList<>(); + for (String snapshotKey : snapshotPurgeRequest.getSnapshotDBKeysList()) { + OmPurgeResponse omPurgeResponse = purgeSnapshot(snapshotKey, ozoneManager, trxnLogIndex); + omPurgeResponses.add(omPurgeResponse); } + omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(), omPurgeResponses); + LOG.info("OmPurgeResponse list: {}.", omPurgeResponses); + } else if (snapshotPurgeRequest.hasSnapshotKey()) { + OmPurgeResponse omPurgeResponse = + purgeSnapshot(snapshotPurgeRequest.getSnapshotKey(), ozoneManager, trxnLogIndex); + omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(), omPurgeResponse); + LOG.info("OmPurgeResponse: {}.", omPurgeResponse); + } else { + throw new OMException("Both snapshotDBKeysList and snapshotKey are null." + + " One of them is required for snapshot purge operations.", OMException.ResultCodes.INVALID_REQUEST); } - omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(), - snapshotDbKeys, updatedSnapInfos, - updatedPathPreviousAndGlobalSnapshots); - omMetrics.incNumSnapshotPurges(); - LOG.info("Successfully executed snapshotPurgeRequest: {{}} along with updating deep clean flags for " + - "snapshots: {} and global and previous for snapshots:{}.", - snapshotPurgeRequest, updatedSnapInfos.keySet(), updatedPathPreviousAndGlobalSnapshots.keySet()); + LOG.debug("Successfully executed snapshotPurgeRequest: {{}}.", snapshotPurgeRequest); } catch (IOException ex) { - omClientResponse = new OMSnapshotPurgeResponse( - createErrorOMResponse(omResponse, ex)); + omClientResponse = new OMSnapshotPurgeResponse(createErrorOMResponse(omResponse, ex)); omMetrics.incNumSnapshotPurgeFails(); LOG.error("Failed to execute snapshotPurgeRequest:{{}}.", snapshotPurgeRequest, ex); } @@ -168,6 +113,85 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn return omClientResponse; } + // Each snapshot purge operation does three things: + // 1. Update the snapshot chain, + // 2. Update the deep clean flag for the next active snapshot (So that it can be + // deep cleaned by the KeyDeletingService in the next run), + // 3. Finally, purge the snapshot. + // All of these steps have to be performed only when it acquires all the necessary + // locks (lock on the snapshot to be purged, lock on the next active snapshot, and + // lock on the next path and global previous snapshots). Ideally, there is no need + // for locks for snapshot purge and can rely on OMStateMachine because OMStateMachine + // is going to process each request sequentially. + // + // But there is a problem with that. After filtering unnecessary SST files for a snapshot, + // SstFilteringService updates that snapshot's SstFilter flag. SstFilteringService cannot + // use SetSnapshotProperty API because it runs on each OM independently and One OM does + // not know if the snapshot has been filtered on the other OM in HA environment. + // + // If locks are not taken snapshot purge and SstFilteringService will cause a race condition + // and override one's update with another. + private OmPurgeResponse purgeSnapshot(String snapshotKey, + OzoneManager ozoneManager, + long trxnLogIndex) throws IOException { + + OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager(); + OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) ozoneManager.getMetadataManager(); + SnapshotChainManager snapshotChainManager = omMetadataManager.getSnapshotChainManager(); + + if (omMetadataManager.getSnapshotInfoTable().get(snapshotKey) == null) { + // Snapshot may have been purged in the previous iteration of SnapshotDeletingService. + throw new OMException("Snapshot: '" + snapshotKey + "}' is no longer exist " + + "in snapshot table. Might be removed in previous run.", OMException.ResultCodes.FILE_NOT_FOUND); + } + + // To acquire all the locks, a set is maintained which is keyed by a triple of volumeName, bucketName and + // snapshotName. SnapshotInfoTable key (which is /volumeName/bucketName/snapshotName) is not directly + // because volumeName, bucketName and snapshotName can't be obtained after purging snapshot from cache. + // Once all the necessary locks are acquired, the three steps mentioned above are performed and + // locks are release after that. + Set> lockSet = new HashSet<>(4, 1); + + try { + acquireLock(lockSet, snapshotKey, omMetadataManager); + + SnapshotInfo fromSnapshot = omMetadataManager.getSnapshotInfoTable().get(snapshotKey); + SnapshotInfo nextSnapshot = + SnapshotUtils.getNextActiveSnapshot(fromSnapshot, snapshotChainManager, omSnapshotManager); + + if (nextSnapshot != null) { + acquireLock(lockSet, nextSnapshot.getTableKey(), omMetadataManager); + } + + // Step 1: Update the snapshot chain. + Pair pathToGlobalSnapshotInto = + updateSnapshotChainAndCache(lockSet, omMetadataManager, fromSnapshot, trxnLogIndex); + SnapshotInfo nextPathSnapshotInfo = null; + SnapshotInfo nextGlobalSnapshotInfo = null; + + if (pathToGlobalSnapshotInto != null) { + nextPathSnapshotInfo = pathToGlobalSnapshotInto.getLeft(); + nextGlobalSnapshotInfo = pathToGlobalSnapshotInto.getRight(); + } + + // Step 2: Update the deep clean flag for the next active snapshot + SnapshotInfo nextActiveSnapshotInfo = updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager, trxnLogIndex); + + // Remove and close snapshot's RocksDB instance from SnapshotCache. + ozoneManager.getOmSnapshotManager().invalidateCacheEntry(fromSnapshot.getSnapshotId()); + + // Step 3: Purge the snapshot from SnapshotInfoTable cache. + ozoneManager.getMetadataManager().getSnapshotInfoTable() + .addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()), CacheValue.get(trxnLogIndex)); + + return new OmPurgeResponse(snapshotKey, nextPathSnapshotInfo, nextGlobalSnapshotInfo, + nextActiveSnapshotInfo); + } finally { + lockSet.forEach(lockKey -> omMetadataManager.getLock() + .releaseWriteLock(SNAPSHOT_LOCK, lockKey.getLeft(), lockKey.getMiddle(), lockKey.getRight())); + } + } + private void acquireLock(Set> lockSet, String snapshotTableKey, OMMetadataManager omMetadataManager) throws IOException { SnapshotInfo snapshotInfo = omMetadataManager.getSnapshotInfoTable().get(snapshotTableKey); @@ -187,23 +211,25 @@ private void acquireLock(Set> lockSet, String sna } } - private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo, - OmMetadataManagerImpl omMetadataManager, long trxnLogIndex, - Map updatedSnapInfos) throws IOException { - if (snapInfo != null) { - // Fetch the latest value again after acquiring lock. - SnapshotInfo updatedSnapshotInfo = omMetadataManager.getSnapshotInfoTable().get(snapInfo.getTableKey()); - - // Setting next snapshot deep clean to false, Since the - // current snapshot is deleted. We can potentially - // reclaim more keys in the next snapshot. - updatedSnapshotInfo.setDeepClean(false); - - // Update table cache first - omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(updatedSnapshotInfo.getTableKey()), - CacheValue.get(trxnLogIndex, updatedSnapshotInfo)); - updatedSnapInfos.put(updatedSnapshotInfo.getTableKey(), updatedSnapshotInfo); + private SnapshotInfo updateSnapshotInfoAndCache(SnapshotInfo snapInfo, OmMetadataManagerImpl omMetadataManager, + long trxnLogIndex) throws IOException { + if (snapInfo == null) { + return null; } + + // Fetch the latest value again after acquiring lock. + SnapshotInfo updatedSnapshotInfo = omMetadataManager.getSnapshotInfoTable().get(snapInfo.getTableKey()); + + // Setting next snapshot deep clean to false, Since the + // current snapshot is deleted. We can potentially + // reclaim more keys in the next snapshot. + updatedSnapshotInfo.setDeepClean(false); + + // Update table cache first + omMetadataManager.getSnapshotInfoTable() + .addCacheEntry(new CacheKey<>(updatedSnapshotInfo.getTableKey()), + CacheValue.get(trxnLogIndex, updatedSnapshotInfo)); + return updatedSnapshotInfo; } /** @@ -212,19 +238,14 @@ private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo, * It also returns the pair of updated next path and global snapshots to * update in DB. */ - private void updateSnapshotChainAndCache( + private Pair updateSnapshotChainAndCache( Set> lockSet, OmMetadataManagerImpl metadataManager, SnapshotInfo snapInfo, - long trxnLogIndex, - Map updatedPathPreviousAndGlobalSnapshots + long trxnLogIndex ) throws IOException { - if (snapInfo == null) { - return; - } - - SnapshotChainManager snapshotChainManager = metadataManager - .getSnapshotChainManager(); + Table snapshotInfoTable = metadataManager.getSnapshotInfoTable(); + SnapshotChainManager snapshotChainManager = metadataManager.getSnapshotChainManager(); // If the snapshot is deleted in the previous run, then the in-memory // SnapshotChainManager might throw NoSuchElementException as the snapshot @@ -237,7 +258,7 @@ private void updateSnapshotChainAndCache( hasNextGlobalSnapshot = snapshotChainManager.hasNextGlobalSnapshot( snapInfo.getSnapshotId()); } catch (NoSuchElementException ex) { - return; + return null; } String nextPathSnapshotKey = null; @@ -245,8 +266,7 @@ private void updateSnapshotChainAndCache( if (hasNextPathSnapshot) { UUID nextPathSnapshotId = snapshotChainManager.nextPathSnapshot( snapInfo.getSnapshotPath(), snapInfo.getSnapshotId()); - nextPathSnapshotKey = snapshotChainManager - .getTableKey(nextPathSnapshotId); + nextPathSnapshotKey = snapshotChainManager.getTableKey(nextPathSnapshotId); // Acquire lock from the snapshot acquireLock(lockSet, nextPathSnapshotKey, metadataManager); @@ -262,19 +282,16 @@ private void updateSnapshotChainAndCache( } SnapshotInfo nextPathSnapInfo = - nextPathSnapshotKey != null ? metadataManager.getSnapshotInfoTable().get(nextPathSnapshotKey) : null; + nextPathSnapshotKey != null ? snapshotInfoTable.get(nextPathSnapshotKey) : null; SnapshotInfo nextGlobalSnapInfo = - nextGlobalSnapshotKey != null ? metadataManager.getSnapshotInfoTable().get(nextGlobalSnapshotKey) : null; + nextGlobalSnapshotKey != null ? snapshotInfoTable.get(nextGlobalSnapshotKey) : null; // Updates next path snapshot's previous snapshot ID if (nextPathSnapInfo != null) { nextPathSnapInfo.setPathPreviousSnapshotId(snapInfo.getPathPreviousSnapshotId()); - metadataManager.getSnapshotInfoTable().addCacheEntry( - new CacheKey<>(nextPathSnapInfo.getTableKey()), + snapshotInfoTable.addCacheEntry(new CacheKey<>(nextPathSnapInfo.getTableKey()), CacheValue.get(trxnLogIndex, nextPathSnapInfo)); - updatedPathPreviousAndGlobalSnapshots - .put(nextPathSnapInfo.getTableKey(), nextPathSnapInfo); } // Updates next global snapshot's previous snapshot ID @@ -282,23 +299,18 @@ private void updateSnapshotChainAndCache( // nextPathSnapInfo.setPathPreviousSnapshotID(), adding this check // will prevent it. if (nextGlobalSnapInfo != null && nextPathSnapInfo != null && - nextGlobalSnapInfo.getSnapshotId().equals(nextPathSnapInfo.getSnapshotId())) { + Objects.equals(nextGlobalSnapInfo.getSnapshotId(), nextPathSnapInfo.getSnapshotId())) { nextPathSnapInfo.setGlobalPreviousSnapshotId(snapInfo.getGlobalPreviousSnapshotId()); - metadataManager.getSnapshotInfoTable().addCacheEntry( - new CacheKey<>(nextPathSnapInfo.getTableKey()), + snapshotInfoTable.addCacheEntry(new CacheKey<>(nextPathSnapInfo.getTableKey()), CacheValue.get(trxnLogIndex, nextPathSnapInfo)); - updatedPathPreviousAndGlobalSnapshots - .put(nextPathSnapInfo.getTableKey(), nextPathSnapInfo); + nextGlobalSnapInfo = nextPathSnapInfo; } else if (nextGlobalSnapInfo != null) { - nextGlobalSnapInfo.setGlobalPreviousSnapshotId( - snapInfo.getGlobalPreviousSnapshotId()); - metadataManager.getSnapshotInfoTable().addCacheEntry( - new CacheKey<>(nextGlobalSnapInfo.getTableKey()), + nextGlobalSnapInfo.setGlobalPreviousSnapshotId(snapInfo.getGlobalPreviousSnapshotId()); + snapshotInfoTable.addCacheEntry(new CacheKey<>(nextGlobalSnapInfo.getTableKey()), CacheValue.get(trxnLogIndex, nextGlobalSnapInfo)); - updatedPathPreviousAndGlobalSnapshots - .put(nextGlobalSnapInfo.getTableKey(), nextGlobalSnapInfo); } snapshotChainManager.deleteSnapshot(snapInfo); + return Pair.of(nextPathSnapInfo, nextGlobalSnapInfo); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java index 45b0c5e0590..f9228cf0e6f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java @@ -35,7 +35,6 @@ import java.io.IOException; import java.nio.file.Path; import java.util.List; -import java.util.Map; import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE; import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.SNAPSHOT_LOCK; @@ -45,22 +44,27 @@ */ @CleanupTableInfo(cleanupTables = {SNAPSHOT_INFO_TABLE}) public class OMSnapshotPurgeResponse extends OMClientResponse { - private static final Logger LOG = - LoggerFactory.getLogger(OMSnapshotPurgeResponse.class); - private final List snapshotDbKeys; - private final Map updatedSnapInfos; - private final Map updatedPreviousAndGlobalSnapInfos; - - public OMSnapshotPurgeResponse( - @Nonnull OMResponse omResponse, - @Nonnull List snapshotDbKeys, - Map updatedSnapInfos, - Map updatedPreviousAndGlobalSnapInfos - ) { + private static final Logger LOG = LoggerFactory.getLogger(OMSnapshotPurgeResponse.class); + + private final OmPurgeResponse omPurgeResponse; + + /** + * This is for the backward compatibility when OMSnapshotPurgeRequest has list of snapshots to purge. + */ + @Deprecated + private final List omPurgeResponses; + + public OMSnapshotPurgeResponse(@Nonnull OMResponse omResponse, OmPurgeResponse omPurgeResponse) { super(omResponse); - this.snapshotDbKeys = snapshotDbKeys; - this.updatedSnapInfos = updatedSnapInfos; - this.updatedPreviousAndGlobalSnapInfos = updatedPreviousAndGlobalSnapInfos; + this.omPurgeResponse = omPurgeResponse; + this.omPurgeResponses = null; + } + + @Deprecated + public OMSnapshotPurgeResponse(@Nonnull OMResponse omResponse, List omPurgeResponses) { + super(omResponse); + this.omPurgeResponse = null; + this.omPurgeResponses = omPurgeResponses; } /** @@ -70,45 +74,58 @@ public OMSnapshotPurgeResponse( public OMSnapshotPurgeResponse(@Nonnull OMResponse omResponse) { super(omResponse); checkStatusNotOK(); - this.snapshotDbKeys = null; - this.updatedSnapInfos = null; - this.updatedPreviousAndGlobalSnapInfos = null; + this.omPurgeResponse = null; + this.omPurgeResponses = null; } @Override - protected void addToDBBatch(OMMetadataManager omMetadataManager, - BatchOperation batchOperation) throws IOException { - - OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl) - omMetadataManager; - updateSnapInfo(metadataManager, batchOperation, - updatedPreviousAndGlobalSnapInfos); - updateSnapInfo(metadataManager, batchOperation, updatedSnapInfos); - for (String dbKey: snapshotDbKeys) { - // Skip the cache here because snapshot is purged from cache in OMSnapshotPurgeRequest. - SnapshotInfo snapshotInfo = omMetadataManager - .getSnapshotInfoTable().getSkipCache(dbKey); - // Even though snapshot existed when SnapshotDeletingService - // was running. It might be deleted in the previous run and - // the DB might not have been updated yet. So snapshotInfo - // can be null. - if (snapshotInfo == null) { - continue; + protected void addToDBBatch(OMMetadataManager omMetadataManager, BatchOperation batchOperation) throws IOException { + if (omPurgeResponse != null) { + addToDbBatch(omPurgeResponse, omMetadataManager, batchOperation); + } else if (omPurgeResponses != null) { + for (OmPurgeResponse purgeResponse : omPurgeResponses) { + addToDbBatch(purgeResponse, omMetadataManager, batchOperation); } + } else { + throw new IllegalStateException("One of snapshotPurgeResponse or snapshotPurgeResponses should be present"); + } + } + + private void addToDbBatch(OmPurgeResponse purgeResponse, + OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl) omMetadataManager; + // Order of transactions is flush next path level snapshot updates followed by next global snapshot and + // next active snapshot. This order should not be changed unless the original order of the operations + // is changed in OmSnapshotPurgeRequest. + updateSnapInfo(metadataManager, batchOperation, purgeResponse.nextPathSnapshotInfo); + updateSnapInfo(metadataManager, batchOperation, purgeResponse.nextGlobalSnapshotInfo); + updateSnapInfo(metadataManager, batchOperation, purgeResponse.nextActiveSnapshotInfo); + + // Skip the cache here because snapshot is purged from cache in OMSnapshotPurgeRequest. + SnapshotInfo snapshotInfo = omMetadataManager.getSnapshotInfoTable() + .getSkipCache(purgeResponse.snapshotTableKey); + + // Even though snapshot existed when SnapshotDeletingService was running. + // It might be deleted in the previous run and the DB might not have been updated yet. + if (snapshotInfo != null) { // Delete Snapshot checkpoint directory. deleteCheckpointDirectory(omMetadataManager, snapshotInfo); - omMetadataManager.getSnapshotInfoTable().deleteWithBatch(batchOperation, dbKey); + // Finally, delete the snapshot. + omMetadataManager.getSnapshotInfoTable().deleteWithBatch(batchOperation, purgeResponse.snapshotTableKey); + } else { + LOG.warn("Snapshot: '{}' is no longer exist in snapshot table. Might be removed in previous run.", + purgeResponse.snapshotTableKey); } } private void updateSnapInfo(OmMetadataManagerImpl metadataManager, BatchOperation batchOp, - Map snapshotInfos) + SnapshotInfo snapshotInfo) throws IOException { - for (Map.Entry entry : snapshotInfos.entrySet()) { - metadataManager.getSnapshotInfoTable().putWithBatch(batchOp, - entry.getKey(), entry.getValue()); + if (snapshotInfo != null) { + metadataManager.getSnapshotInfoTable().putWithBatch(batchOp, snapshotInfo.getTableKey(), snapshotInfo); } } @@ -137,4 +154,36 @@ private void deleteCheckpointDirectory(OMMetadataManager omMetadataManager, } } } + + /** + * POJO to maintain the order of transactions when purge API is called with batch. + */ + public static final class OmPurgeResponse { + private final String snapshotTableKey; + private final SnapshotInfo nextPathSnapshotInfo; + private final SnapshotInfo nextGlobalSnapshotInfo; + private final SnapshotInfo nextActiveSnapshotInfo; + + public OmPurgeResponse(@Nonnull String snapshotTableKey, + SnapshotInfo nextPathSnapshotInfo, + SnapshotInfo nextGlobalSnapshotInfo, + SnapshotInfo nextActiveSnapshotInfo) { + this.snapshotTableKey = snapshotTableKey; + this.nextPathSnapshotInfo = nextPathSnapshotInfo; + this.nextGlobalSnapshotInfo = nextGlobalSnapshotInfo; + this.nextActiveSnapshotInfo = nextActiveSnapshotInfo; + } + + @Override + public String toString() { + return "{snapshotTableKey: '" + snapshotTableKey + '\'' + + ", nextPathSnapshotInfo: '" + + (nextPathSnapshotInfo != null ? nextPathSnapshotInfo.getName() : null) + '\'' + + ", nextGlobalSnapshotInfo: '" + + (nextGlobalSnapshotInfo != null ? nextGlobalSnapshotInfo.getName() : null) + '\'' + + ", nextActiveSnapshotInfo: '" + + (nextActiveSnapshotInfo != null ? nextActiveSnapshotInfo.getName() : null) + '\'' + + '}'; + } + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java index 99e3903447d..e48212a30f9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java @@ -451,10 +451,10 @@ private long handleDirectoryCleanUp( } private void submitSnapshotPurgeRequest(List purgeSnapshotKeys) { - if (!purgeSnapshotKeys.isEmpty()) { + for (String snapshotKey: purgeSnapshotKeys) { SnapshotPurgeRequest snapshotPurgeRequest = SnapshotPurgeRequest .newBuilder() - .addAllSnapshotDBKeys(purgeSnapshotKeys) + .setSnapshotKey(snapshotKey) .build(); OMRequest omRequest = OMRequest.newBuilder() diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java index 8edd096e766..ccd775f435e 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java @@ -56,7 +56,6 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; @@ -153,19 +152,17 @@ private List createSnapshots(int numSnapshotKeys) * * @return OMRequest */ - private OMRequest createPurgeKeysRequest(List purgeSnapshotKeys) { + private OMRequest createPurgeKeysRequest(String snapshotKey) { SnapshotPurgeRequest snapshotPurgeRequest = SnapshotPurgeRequest .newBuilder() - .addAllSnapshotDBKeys(purgeSnapshotKeys) + .setSnapshotKey(snapshotKey) .build(); - OMRequest omRequest = OMRequest.newBuilder() + return OMRequest.newBuilder() .setCmdType(Type.SnapshotPurge) .setSnapshotPurgeRequest(snapshotPurgeRequest) .setClientId(UUID.randomUUID().toString()) .build(); - - return omRequest; } /** @@ -240,18 +237,67 @@ public void testValidateAndUpdateCache() throws Exception { List snapshotDbKeysToPurge = createSnapshots(10); assertFalse(omMetadataManager.getSnapshotInfoTable().isEmpty()); - OMRequest snapshotPurgeRequest = createPurgeKeysRequest( - snapshotDbKeysToPurge); + List omSnapshotPurgeResponses = new ArrayList<>(); + long transactionLogIndex = 200; - OMSnapshotPurgeRequest omSnapshotPurgeRequest = preExecute(snapshotPurgeRequest); + for (String snapshotKey: snapshotDbKeysToPurge) { + OMRequest omPurgeRequest = createPurgeKeysRequest(snapshotKey); + OMSnapshotPurgeRequest omSnapshotPurgeRequest = preExecute(omPurgeRequest); - OMSnapshotPurgeResponse omSnapshotPurgeResponse = (OMSnapshotPurgeResponse) - omSnapshotPurgeRequest.validateAndUpdateCache(ozoneManager, 200L); + omSnapshotPurgeResponses.add((OMSnapshotPurgeResponse) + omSnapshotPurgeRequest.validateAndUpdateCache(ozoneManager, transactionLogIndex++)); + } for (String snapshotTableKey: snapshotDbKeysToPurge) { assertNull(omMetadataManager.getSnapshotInfoTable().get(snapshotTableKey)); } + for (OMSnapshotPurgeResponse omSnapshotPurgeResponse: omSnapshotPurgeResponses) { + try (BatchOperation batchOperation = omMetadataManager.getStore().initBatchOperation()) { + omSnapshotPurgeResponse.checkAndUpdateDB(omMetadataManager, batchOperation); + omMetadataManager.getStore().commitBatchOperation(batchOperation); + } + } + + // Check if the entries are deleted. + assertTrue(omMetadataManager.getSnapshotInfoTable().isEmpty()); + + // Check if all the checkpoints are cleared. + for (Path checkpoint : checkpointPaths) { + assertFalse(Files.exists(checkpoint)); + } + assertEquals(initialSnapshotPurgeCount + snapshotDbKeysToPurge.size(), omMetrics.getNumSnapshotPurges()); + assertEquals(initialSnapshotPurgeFailCount, omMetrics.getNumSnapshotPurgeFails()); + } + + @Test + public void testValidateAndUpdateCacheWithBatch() throws Exception { + long initialSnapshotPurgeCount = omMetrics.getNumSnapshotPurges(); + long initialSnapshotPurgeFailCount = omMetrics.getNumSnapshotPurgeFails(); + + List snapshotDbKeysToPurge = createSnapshots(10); + assertFalse(omMetadataManager.getSnapshotInfoTable().isEmpty()); + + long transactionLogIndex = 200; + + OMRequest omPurgeRequest = OMRequest.newBuilder() + .setCmdType(Type.SnapshotPurge) + .setSnapshotPurgeRequest(SnapshotPurgeRequest + .newBuilder() + .addAllSnapshotDBKeys(snapshotDbKeysToPurge) + .build()) + .setClientId(UUID.randomUUID().toString()) + .build(); + + OMSnapshotPurgeRequest omSnapshotPurgeRequest = preExecute(omPurgeRequest); + + OMSnapshotPurgeResponse omSnapshotPurgeResponse = (OMSnapshotPurgeResponse) + omSnapshotPurgeRequest.validateAndUpdateCache(ozoneManager, transactionLogIndex); + + for (String snapshotDbKey: snapshotDbKeysToPurge) { + assertNull(omMetadataManager.getSnapshotInfoTable().get(snapshotDbKey)); + } + try (BatchOperation batchOperation = omMetadataManager.getStore().initBatchOperation()) { omSnapshotPurgeResponse.checkAndUpdateDB(omMetadataManager, batchOperation); omMetadataManager.getStore().commitBatchOperation(batchOperation); @@ -264,6 +310,7 @@ public void testValidateAndUpdateCache() throws Exception { for (Path checkpoint : checkpointPaths) { assertFalse(Files.exists(checkpoint)); } + assertEquals(initialSnapshotPurgeCount + 1, omMetrics.getNumSnapshotPurges()); assertEquals(initialSnapshotPurgeFailCount, omMetrics.getNumSnapshotPurgeFails()); } @@ -285,15 +332,19 @@ public void testValidateAndUpdateCacheFailure() throws Exception { when(mockedMetadataManager.getSnapshotInfoTable()).thenReturn(mockedSnapshotInfoTable); when(ozoneManager.getMetadataManager()).thenReturn(mockedMetadataManager); - OMRequest snapshotPurgeRequest = createPurgeKeysRequest(snapshotDbKeysToPurge); - OMSnapshotPurgeRequest omSnapshotPurgeRequest = preExecute(snapshotPurgeRequest); + for (String snapshotKey: snapshotDbKeysToPurge) { + OMRequest snapshotPurgeRequest = createPurgeKeysRequest(snapshotKey); + OMSnapshotPurgeRequest omSnapshotPurgeRequest = preExecute(snapshotPurgeRequest); - OMSnapshotPurgeResponse omSnapshotPurgeResponse = (OMSnapshotPurgeResponse) - omSnapshotPurgeRequest.validateAndUpdateCache(ozoneManager, 200L); + OMSnapshotPurgeResponse omSnapshotPurgeResponse = (OMSnapshotPurgeResponse) + omSnapshotPurgeRequest.validateAndUpdateCache(ozoneManager, 200L); + + assertEquals(INTERNAL_ERROR, omSnapshotPurgeResponse.getOMResponse().getStatus()); + } - assertEquals(INTERNAL_ERROR, omSnapshotPurgeResponse.getOMResponse().getStatus()); assertEquals(initialSnapshotPurgeCount, omMetrics.getNumSnapshotPurges()); - assertEquals(initialSnapshotPurgeFailCount + 1, omMetrics.getNumSnapshotPurgeFails()); + assertEquals(initialSnapshotPurgeFailCount + snapshotDbKeysToPurge.size(), + omMetrics.getNumSnapshotPurgeFails()); } // TODO: clean up: Do we this test after @@ -342,10 +393,8 @@ public void testSnapshotChainCleanup(int index) throws Exception { long rowsInTableBeforePurge = omMetadataManager .countRowsInTable(omMetadataManager.getSnapshotInfoTable()); - // Purge Snapshot of the given index. - List toPurgeList = Collections.singletonList(snapShotToPurge); - OMRequest snapshotPurgeRequest = createPurgeKeysRequest( - toPurgeList); + + OMRequest snapshotPurgeRequest = createPurgeKeysRequest(snapShotToPurge); purgeSnapshots(snapshotPurgeRequest); // After purge, check snapshot chain. @@ -453,17 +502,15 @@ public void testSnapshotChainInSnapshotInfoTableAfterSnapshotPurge( validateSnapshotOrderInSnapshotInfoTableAndSnapshotChain(snapshotInfoList); - List purgeSnapshotKeys = new ArrayList<>(); for (int i = fromIndex; i <= toIndex; i++) { SnapshotInfo purgeSnapshotInfo = snapshotInfoList.get(i); String purgeSnapshotKey = SnapshotInfo.getTableKey(volumeName, purgeSnapshotInfo.getBucketName(), purgeSnapshotInfo.getName()); - purgeSnapshotKeys.add(purgeSnapshotKey); + OMRequest snapshotPurgeRequest = createPurgeKeysRequest(purgeSnapshotKey); + purgeSnapshots(snapshotPurgeRequest); } - OMRequest snapshotPurgeRequest = createPurgeKeysRequest(purgeSnapshotKeys); - purgeSnapshots(snapshotPurgeRequest); List snapshotInfoListAfterPurge = new ArrayList<>(); for (int i = 0; i < totalKeys; i++) {