Skip to content

Commit

Permalink
HDDS-11137. Removed locks from SnapshotPurge and SnapshotSetProperty …
Browse files Browse the repository at this point in the history
…APIs.
  • Loading branch information
hemantk-12 committed Aug 7, 2024
1 parent 2c0e0b1 commit 57458e0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@

package org.apache.hadoop.ozone.om.request.snapshot;

import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
Expand All @@ -44,15 +41,11 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;

import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.SNAPSHOT_LOCK;

/**
* Handles OMSnapshotPurge Request.
* This is an OM internal request. Does not need @RequireSnapshotFeatureState.
Expand Down Expand Up @@ -92,62 +85,34 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
new HashMap<>();

// Each snapshot purge operation does three things:
// 1. Update the snapshot chain,
// 2. Update the deep clean flag for the next active snapshot (So that it can be
// 1. Update the deep clean flag for the next active snapshot (So that it can be
// deep cleaned by the KeyDeletingService in the next run),
// 2. Update the snapshot chain,
// 3. Finally, purge the snapshot.
// All of these steps have to be performed only when it acquires all the necessary
// locks (lock on the snapshot to be purged, lock on the next active snapshot, and
// lock on the next path and global previous snapshots). Ideally, there is no need
// for locks for snapshot purge and can rely on OMStateMachine because OMStateMachine
// is going to process each request sequentially.
//
// But there is a problem with that. After filtering unnecessary SST files for a snapshot,
// SstFilteringService updates that snapshot's SstFilter flag. SstFilteringService cannot
// use SetSnapshotProperty API because it runs on each OM independently and One OM does
// not know if the snapshot has been filtered on the other OM in HA environment.
//
// If locks are not taken snapshot purge and SstFilteringService will cause a race condition
// and override one's update with another.
// There is no need to take lock for snapshot purge as of now. We can simply rely on OMStateMachine
// because it executes transaction sequentially.
for (String snapTableKey : snapshotDbKeys) {
// To acquire all the locks, a set is maintained which is keyed by snapshotTableKey.
// snapshotTableKey is nothing but /volumeName/bucketName/snapshotName.
// Once all the locks are acquired, it performs the three steps mentioned above and
// release all the locks after that.
Set<Triple<String, String, String>> lockSet = new HashSet<>(4, 1);
try {
if (omMetadataManager.getSnapshotInfoTable().get(snapTableKey) == null) {
// Snapshot may have been purged in the previous iteration of SnapshotDeletingService.
LOG.warn("The snapshot {} is not longer in snapshot table, It maybe removed in the previous " +
"Snapshot purge request.", snapTableKey);
continue;
}

acquireLock(lockSet, snapTableKey, omMetadataManager);
SnapshotInfo fromSnapshot = omMetadataManager.getSnapshotInfoTable().get(snapTableKey);

SnapshotInfo nextSnapshot =
SnapshotUtils.getNextActiveSnapshot(fromSnapshot, snapshotChainManager, omSnapshotManager);

if (nextSnapshot != null) {
acquireLock(lockSet, nextSnapshot.getTableKey(), omMetadataManager);
}

// Update the chain first so that it has all the necessary locks before updating deep clean.
updateSnapshotChainAndCache(lockSet, omMetadataManager, fromSnapshot, trxnLogIndex,
updatedPathPreviousAndGlobalSnapshots);
updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager, trxnLogIndex, updatedSnapInfos);
// Remove and close snapshot's RocksDB instance from SnapshotCache.
omSnapshotManager.invalidateCacheEntry(fromSnapshot.getSnapshotId());
// Update SnapshotInfoTable cache.
omMetadataManager.getSnapshotInfoTable()
.addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()), CacheValue.get(trxnLogIndex));
} finally {
for (Triple<String, String, String> lockKey: lockSet) {
omMetadataManager.getLock()
.releaseWriteLock(SNAPSHOT_LOCK, lockKey.getLeft(), lockKey.getMiddle(), lockKey.getRight());
}
SnapshotInfo fromSnapshot = omMetadataManager.getSnapshotInfoTable().get(snapTableKey);
if (fromSnapshot == null) {
// Snapshot may have been purged in the previous iteration of SnapshotDeletingService.
LOG.warn("The snapshot {} is not longer in snapshot table, It maybe removed in the previous " +
"Snapshot purge request.", snapTableKey);
continue;
}

SnapshotInfo nextSnapshot =
SnapshotUtils.getNextActiveSnapshot(fromSnapshot, snapshotChainManager, omSnapshotManager);

// Step 1: Update the deep clean flag for the next active snapshot
updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager, trxnLogIndex, updatedSnapInfos);
// Step 2: Update the snapshot chain.
updateSnapshotChainAndCache(omMetadataManager, fromSnapshot, trxnLogIndex,
updatedPathPreviousAndGlobalSnapshots);
// Remove and close snapshot's RocksDB instance from SnapshotCache.
omSnapshotManager.invalidateCacheEntry(fromSnapshot.getSnapshotId());
// Step 3: Purge the snapshot from SnapshotInfoTable cache.
omMetadataManager.getSnapshotInfoTable()
.addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()), CacheValue.get(trxnLogIndex));
}

omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(),
Expand All @@ -168,41 +133,19 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
return omClientResponse;
}

private void acquireLock(Set<Triple<String, String, String>> lockSet, String snapshotTableKey,
OMMetadataManager omMetadataManager) throws IOException {
SnapshotInfo snapshotInfo = omMetadataManager.getSnapshotInfoTable().get(snapshotTableKey);

// It should not be the case that lock is required for non-existing snapshot.
if (snapshotInfo == null) {
LOG.error("Snapshot: '{}' doesn't not exist in snapshot table.", snapshotTableKey);
throw new OMException("Snapshot: '{" + snapshotTableKey + "}' doesn't not exist in snapshot table.",
OMException.ResultCodes.FILE_NOT_FOUND);
}
Triple<String, String, String> lockKey = Triple.of(snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(),
snapshotInfo.getName());
if (!lockSet.contains(lockKey)) {
mergeOmLockDetails(omMetadataManager.getLock()
.acquireWriteLock(SNAPSHOT_LOCK, lockKey.getLeft(), lockKey.getMiddle(), lockKey.getRight()));
lockSet.add(lockKey);
}
}

private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo,
OmMetadataManagerImpl omMetadataManager, long trxnLogIndex,
Map<String, SnapshotInfo> updatedSnapInfos) throws IOException {
Map<String, SnapshotInfo> updatedSnapInfos) throws IOException {
if (snapInfo != null) {
// Fetch the latest value again after acquiring lock.
SnapshotInfo updatedSnapshotInfo = omMetadataManager.getSnapshotInfoTable().get(snapInfo.getTableKey());

// Setting next snapshot deep clean to false, Since the
// current snapshot is deleted. We can potentially
// reclaim more keys in the next snapshot.
updatedSnapshotInfo.setDeepClean(false);
snapInfo.setDeepClean(false);

// Update table cache first
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(updatedSnapshotInfo.getTableKey()),
CacheValue.get(trxnLogIndex, updatedSnapshotInfo));
updatedSnapInfos.put(updatedSnapshotInfo.getTableKey(), updatedSnapshotInfo);
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(snapInfo.getTableKey()),
CacheValue.get(trxnLogIndex, snapInfo));
updatedSnapInfos.put(snapInfo.getTableKey(), snapInfo);
}
}

Expand All @@ -213,7 +156,6 @@ private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo,
* update in DB.
*/
private void updateSnapshotChainAndCache(
Set<Triple<String, String, String>> lockSet,
OmMetadataManagerImpl metadataManager,
SnapshotInfo snapInfo,
long trxnLogIndex,
Expand Down Expand Up @@ -247,18 +189,12 @@ private void updateSnapshotChainAndCache(
snapInfo.getSnapshotPath(), snapInfo.getSnapshotId());
nextPathSnapshotKey = snapshotChainManager
.getTableKey(nextPathSnapshotId);

// Acquire lock from the snapshot
acquireLock(lockSet, nextPathSnapshotKey, metadataManager);
}

String nextGlobalSnapshotKey = null;
if (hasNextGlobalSnapshot) {
UUID nextGlobalSnapshotId = snapshotChainManager.nextGlobalSnapshot(snapInfo.getSnapshotId());
nextGlobalSnapshotKey = snapshotChainManager.getTableKey(nextGlobalSnapshotId);

// Acquire lock from the snapshot
acquireLock(lockSet, nextGlobalSnapshotKey, metadataManager);
}

SnapshotInfo nextPathSnapInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.io.IOException;

import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.SNAPSHOT_LOCK;

/**
* Updates the exclusive size of the snapshot.
Expand All @@ -55,41 +54,24 @@ public OMSnapshotSetPropertyRequest(OMRequest omRequest) {
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIndex termIndex) {
OMMetrics omMetrics = ozoneManager.getMetrics();

OMClientResponse omClientResponse = null;
OMClientResponse omClientResponse;
OMMetadataManager metadataManager = ozoneManager.getMetadataManager();

OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
OmResponseUtil.getOMResponseBuilder(getOmRequest());
OzoneManagerProtocolProtos.SetSnapshotPropertyRequest
setSnapshotPropertyRequest = getOmRequest()
.getSetSnapshotPropertyRequest();
SnapshotInfo updatedSnapInfo = null;

String snapshotKey = setSnapshotPropertyRequest.getSnapshotKey();
boolean acquiredSnapshotLock = false;
String volumeName = null;
String bucketName = null;
String snapshotName = null;

try {
SnapshotInfo snapshotInfo = metadataManager.getSnapshotInfoTable().get(snapshotKey);
if (snapshotInfo == null) {
SnapshotInfo updatedSnapInfo = metadataManager.getSnapshotInfoTable().get(snapshotKey);
if (updatedSnapInfo == null) {
LOG.error("Snapshot: '{}' doesn't not exist in snapshot table.", snapshotKey);
throw new OMException("Snapshot: '{" + snapshotKey + "}' doesn't not exist in snapshot table.", FILE_NOT_FOUND);
}

volumeName = snapshotInfo.getVolumeName();
bucketName = snapshotInfo.getBucketName();
snapshotName = snapshotInfo.getName();

mergeOmLockDetails(metadataManager.getLock()
.acquireWriteLock(SNAPSHOT_LOCK, volumeName, bucketName, snapshotName));

acquiredSnapshotLock = getOmLockDetails().isLockAcquired();

updatedSnapInfo = metadataManager.getSnapshotInfoTable()
.get(snapshotKey);


if (setSnapshotPropertyRequest.hasDeepCleanedDeletedDir()) {
updatedSnapInfo.setDeepCleanedDeletedDir(setSnapshotPropertyRequest
Expand Down Expand Up @@ -126,14 +108,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
createErrorOMResponse(omResponse, ex));
omMetrics.incNumSnapshotSetPropertyFails();
LOG.error("Failed to execute snapshotSetPropertyRequest: {{}}.", setSnapshotPropertyRequest, ex);
} finally {
if (acquiredSnapshotLock) {
mergeOmLockDetails(metadataManager.getLock()
.releaseWriteLock(SNAPSHOT_LOCK, volumeName, bucketName, snapshotName));
}
if (omClientResponse != null) {
omClientResponse.setOmLockDetails(getOmLockDetails());
}
}

return omClientResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager,

OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
omMetadataManager;
updateSnapInfo(metadataManager, batchOperation, updatedSnapInfos);
updateSnapInfo(metadataManager, batchOperation,
updatedPreviousAndGlobalSnapInfos);
updateSnapInfo(metadataManager, batchOperation, updatedSnapInfos);
for (String dbKey: snapshotDbKeys) {
// Skip the cache here because snapshot is purged from cache in OMSnapshotPurgeRequest.
SnapshotInfo snapshotInfo = omMetadataManager
Expand Down

0 comments on commit 57458e0

Please sign in to comment.