Skip to content

Commit

Permalink
Maintain local cache in OMSnapshotPurgeRequest to get updated snapsho…
Browse files Browse the repository at this point in the history
…tInfo and pass the same to OMSnapshotPurgeResponse.
  • Loading branch information
hemantk-12 committed Aug 6, 2024
1 parent 7bac0a0 commit 17f47de
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hadoop.ozone.om.request.snapshot;

import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
Expand Down Expand Up @@ -54,6 +55,13 @@ public class OMSnapshotPurgeRequest extends OMClientRequest {

private static final Logger LOG = LoggerFactory.getLogger(OMSnapshotPurgeRequest.class);

/**
* This map contains up to date snapshotInfo and works as a local cache for OMSnapshotPurgeRequest.
* Since purge and other updates happen in sequence inside validateAndUpdateCache, we can get updated snapshotInfo
* from this map rather than getting form snapshotInfoTable which creates a deep copy for every get call.
*/
private final Map<String, SnapshotInfo> updatedSnapshotInfos = new HashMap<>();

public OMSnapshotPurgeRequest(OMRequest omRequest) {
super(omRequest);
}
Expand All @@ -80,19 +88,16 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
try {
List<String> snapshotDbKeys = snapshotPurgeRequest
.getSnapshotDBKeysList();
Map<String, SnapshotInfo> updatedSnapInfos = new HashMap<>();
Map<String, SnapshotInfo> updatedPathPreviousAndGlobalSnapshots =
new HashMap<>();

// Each snapshot purge operation does three things:
// 1. Update the deep clean flag for the next active snapshot (So that it can be
// deep cleaned by the KeyDeletingService in the next run)
// 2. Update the snapshot chain,,
// deep cleaned by the KeyDeletingService in the next run),
// 2. Update the snapshot chain,
// 3. Finally, purge the snapshot.
// There is no need to take lock for snapshot purge as of now. We can simply rely on OMStateMachine
// because it executes transaction sequentially.
for (String snapTableKey : snapshotDbKeys) {
SnapshotInfo fromSnapshot = omMetadataManager.getSnapshotInfoTable().get(snapTableKey);
SnapshotInfo fromSnapshot = getUpdatedSnapshotInfo(snapTableKey, omMetadataManager);
if (fromSnapshot == null) {
// Snapshot may have been purged in the previous iteration of SnapshotDeletingService.
LOG.warn("The snapshot {} is not longer in snapshot table, It maybe removed in the previous " +
Expand All @@ -104,25 +109,21 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
SnapshotUtils.getNextActiveSnapshot(fromSnapshot, snapshotChainManager, omSnapshotManager);

// Step 1: Update the deep clean flag for the next active snapshot
updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager, trxnLogIndex, updatedSnapInfos);
updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager, trxnLogIndex);
// Step 2: Update the snapshot chain.
updateSnapshotChainAndCache(omMetadataManager, fromSnapshot, trxnLogIndex,
updatedPathPreviousAndGlobalSnapshots);
updateSnapshotChainAndCache(omMetadataManager, fromSnapshot, trxnLogIndex);
// Remove and close snapshot's RocksDB instance from SnapshotCache.
omSnapshotManager.invalidateCacheEntry(fromSnapshot.getSnapshotId());
// Step 3: Purge the snapshot from SnapshotInfoTable cache.
omMetadataManager.getSnapshotInfoTable()
.addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()), CacheValue.get(trxnLogIndex));
}

omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(),
snapshotDbKeys, updatedSnapInfos,
updatedPathPreviousAndGlobalSnapshots);
omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(), snapshotDbKeys, updatedSnapshotInfos);

omMetrics.incNumSnapshotPurges();
LOG.info("Successfully executed snapshotPurgeRequest: {{}} along with updating deep clean flags for " +
"snapshots: {} and global and previous for snapshots:{}.",
snapshotPurgeRequest, updatedSnapInfos.keySet(), updatedPathPreviousAndGlobalSnapshots.keySet());
LOG.info("Successfully executed snapshotPurgeRequest: {{}} along with updating snapshots:{}.",
snapshotPurgeRequest, updatedSnapshotInfos);
} catch (IOException ex) {
omClientResponse = new OMSnapshotPurgeResponse(
createErrorOMResponse(omResponse, ex));
Expand All @@ -133,9 +134,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
return omClientResponse;
}

private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo,
OmMetadataManagerImpl omMetadataManager, long trxnLogIndex,
Map<String, SnapshotInfo> updatedSnapInfos) throws IOException {
private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo, OmMetadataManagerImpl omMetadataManager,
long trxnLogIndex) throws IOException {
if (snapInfo != null) {
// Setting next snapshot deep clean to false, Since the
// current snapshot is deleted. We can potentially
Expand All @@ -145,7 +145,6 @@ private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo,
// Update table cache first
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(snapInfo.getTableKey()),
CacheValue.get(trxnLogIndex, snapInfo));
updatedSnapInfos.put(snapInfo.getTableKey(), snapInfo);
}
}

Expand All @@ -158,8 +157,7 @@ private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo,
private void updateSnapshotChainAndCache(
OmMetadataManagerImpl metadataManager,
SnapshotInfo snapInfo,
long trxnLogIndex,
Map<String, SnapshotInfo> updatedPathPreviousAndGlobalSnapshots
long trxnLogIndex
) throws IOException {
if (snapInfo == null) {
return;
Expand Down Expand Up @@ -198,19 +196,17 @@ private void updateSnapshotChainAndCache(
}

SnapshotInfo nextPathSnapInfo =
nextPathSnapshotKey != null ? metadataManager.getSnapshotInfoTable().get(nextPathSnapshotKey) : null;
nextPathSnapshotKey != null ? getUpdatedSnapshotInfo(nextPathSnapshotKey, metadataManager) : null;

SnapshotInfo nextGlobalSnapInfo =
nextGlobalSnapshotKey != null ? metadataManager.getSnapshotInfoTable().get(nextGlobalSnapshotKey) : null;
nextGlobalSnapshotKey != null ? getUpdatedSnapshotInfo(nextGlobalSnapshotKey, metadataManager) : null;

// Updates next path snapshot's previous snapshot ID
if (nextPathSnapInfo != null) {
nextPathSnapInfo.setPathPreviousSnapshotId(snapInfo.getPathPreviousSnapshotId());
metadataManager.getSnapshotInfoTable().addCacheEntry(
new CacheKey<>(nextPathSnapInfo.getTableKey()),
CacheValue.get(trxnLogIndex, nextPathSnapInfo));
updatedPathPreviousAndGlobalSnapshots
.put(nextPathSnapInfo.getTableKey(), nextPathSnapInfo);
}

// Updates next global snapshot's previous snapshot ID
Expand All @@ -223,18 +219,25 @@ private void updateSnapshotChainAndCache(
metadataManager.getSnapshotInfoTable().addCacheEntry(
new CacheKey<>(nextPathSnapInfo.getTableKey()),
CacheValue.get(trxnLogIndex, nextPathSnapInfo));
updatedPathPreviousAndGlobalSnapshots
.put(nextPathSnapInfo.getTableKey(), nextPathSnapInfo);
} else if (nextGlobalSnapInfo != null) {
nextGlobalSnapInfo.setGlobalPreviousSnapshotId(
snapInfo.getGlobalPreviousSnapshotId());
metadataManager.getSnapshotInfoTable().addCacheEntry(
new CacheKey<>(nextGlobalSnapInfo.getTableKey()),
CacheValue.get(trxnLogIndex, nextGlobalSnapInfo));
updatedPathPreviousAndGlobalSnapshots
.put(nextGlobalSnapInfo.getTableKey(), nextGlobalSnapInfo);
}

snapshotChainManager.deleteSnapshot(snapInfo);
}

private SnapshotInfo getUpdatedSnapshotInfo(String snapshotTableKey, OMMetadataManager omMetadataManager)
throws IOException {
if (updatedSnapshotInfos.containsKey(snapshotTableKey)) {
return updatedSnapshotInfos.get(snapshotTableKey);
} else {
SnapshotInfo snapshotInfo = omMetadataManager.getSnapshotInfoTable().get(snapshotTableKey);
updatedSnapshotInfos.put(snapshotTableKey, snapshotInfo);
return snapshotInfo;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,15 @@ public class OMSnapshotPurgeResponse extends OMClientResponse {
LoggerFactory.getLogger(OMSnapshotPurgeResponse.class);
private final List<String> snapshotDbKeys;
private final Map<String, SnapshotInfo> updatedSnapInfos;
private final Map<String, SnapshotInfo> updatedPreviousAndGlobalSnapInfos;

public OMSnapshotPurgeResponse(
@Nonnull OMResponse omResponse,
@Nonnull List<String> snapshotDbKeys,
Map<String, SnapshotInfo> updatedSnapInfos,
Map<String, SnapshotInfo> updatedPreviousAndGlobalSnapInfos
Map<String, SnapshotInfo> updatedSnapInfos
) {
super(omResponse);
this.snapshotDbKeys = snapshotDbKeys;
this.updatedSnapInfos = updatedSnapInfos;
this.updatedPreviousAndGlobalSnapInfos = updatedPreviousAndGlobalSnapInfos;
}

/**
Expand All @@ -72,7 +69,6 @@ public OMSnapshotPurgeResponse(@Nonnull OMResponse omResponse) {
checkStatusNotOK();
this.snapshotDbKeys = null;
this.updatedSnapInfos = null;
this.updatedPreviousAndGlobalSnapInfos = null;
}

@Override
Expand All @@ -82,8 +78,6 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager,
OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
omMetadataManager;
updateSnapInfo(metadataManager, batchOperation, updatedSnapInfos);
updateSnapInfo(metadataManager, batchOperation,
updatedPreviousAndGlobalSnapInfos);
for (String dbKey: snapshotDbKeys) {
// Skip the cache here because snapshot is purged from cache in OMSnapshotPurgeRequest.
SnapshotInfo snapshotInfo = omMetadataManager
Expand Down

0 comments on commit 17f47de

Please sign in to comment.