Skip to content

Commit

Permalink
HDDS-10250. Use SnapshotId as key in SnapshotCache (apache#6139)
Browse files Browse the repository at this point in the history
  • Loading branch information
hemantk-12 authored Feb 10, 2024
1 parent 7c79246 commit c35e99f
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.UUID;

import com.google.common.cache.RemovalListener;
import org.apache.hadoop.hdds.StringUtils;
Expand Down Expand Up @@ -244,7 +245,7 @@ public OmSnapshotManager(OzoneManager ozoneManager) {
OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE,
OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE_DEFAULT);

CacheLoader<String, OmSnapshot> loader = createCacheLoader();
CacheLoader<UUID, OmSnapshot> loader = createCacheLoader();

// TODO: [SNAPSHOT] Remove this if not going to make SnapshotCache impl
// pluggable.
Expand Down Expand Up @@ -325,19 +326,25 @@ public boolean canDisableFsSnapshot(OMMetadataManager ommm) {
return isSnapshotInfoTableEmpty;
}

private CacheLoader<String, OmSnapshot> createCacheLoader() {
return new CacheLoader<String, OmSnapshot>() {
private CacheLoader<UUID, OmSnapshot> createCacheLoader() {
return new CacheLoader<UUID, OmSnapshot>() {

@Nonnull
@Override
public OmSnapshot load(@Nonnull String snapshotTableKey)
throws IOException {
// Check if the snapshot exists
final SnapshotInfo snapshotInfo = getSnapshotInfo(snapshotTableKey);
public OmSnapshot load(@Nonnull UUID snapshotId) throws IOException {
String snapshotTableKey = ((OmMetadataManagerImpl) ozoneManager.getMetadataManager())
.getSnapshotChainManager()
.getTableKey(snapshotId);

// SnapshotChain maintains in-memory reverse mapping of snapshotId to snapshotName based on snapshotInfoTable.
// So it should not happen ideally.
// If it happens, then either snapshot has been purged in between or SnapshotChain is corrupted
// and missing some entries which needs investigation.
if (snapshotTableKey == null) {
throw new IOException("No snapshot exist with snapshotId: " + snapshotId);
}

// Block snapshot from loading when it is no longer active e.g. DELETED,
// unless this is called from SnapshotDeletingService.
checkSnapshotActive(snapshotInfo, true);
final SnapshotInfo snapshotInfo = getSnapshotInfo(snapshotTableKey);

CacheValue<SnapshotInfo> cacheValue = ozoneManager.getMetadataManager()
.getSnapshotInfoTable()
Expand Down Expand Up @@ -417,9 +424,9 @@ public void invalidateCache() {
/**
* Immediately invalidate an entry.
*
* @param key DB snapshot table key
* @param key SnapshotId.
*/
public void invalidateCacheEntry(String key) throws IOException {
public void invalidateCacheEntry(UUID key) throws IOException {
if (snapshotCache != null) {
snapshotCache.invalidate(key);
}
Expand Down Expand Up @@ -663,17 +670,16 @@ private ReferenceCounted<OmSnapshot> getSnapshot(
return getSnapshot(snapshotTableKey, skipActiveCheck);
}

private ReferenceCounted<OmSnapshot> getSnapshot(
String snapshotTableKey,
boolean skipActiveCheck) throws IOException {

private ReferenceCounted<OmSnapshot> getSnapshot(String snapshotTableKey, boolean skipActiveCheck)
throws IOException {
SnapshotInfo snapshotInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, snapshotTableKey);
// Block FS API reads when snapshot is not active.
if (!skipActiveCheck) {
checkSnapshotActive(ozoneManager, snapshotTableKey);
checkSnapshotActive(snapshotInfo, false);
}

// retrieve the snapshot from the cache
return snapshotCache.get(snapshotTableKey);
return snapshotCache.get(snapshotInfo.getSnapshotId());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
trxnLogIndex, updatedSnapInfos);
updateSnapshotChainAndCache(omMetadataManager, fromSnapshot,
trxnLogIndex, updatedPathPreviousAndGlobalSnapshots);
ozoneManager.getOmSnapshotManager().invalidateCacheEntry(snapTableKey);
ozoneManager.getOmSnapshotManager().invalidateCacheEntry(fromSnapshot.getSnapshotId());
}

omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
Expand All @@ -39,26 +40,25 @@ public class SnapshotCache {
static final Logger LOG = LoggerFactory.getLogger(SnapshotCache.class);

// Snapshot cache internal hash map.
// Key: DB snapshot table key
// Key: SnapshotId
// Value: OmSnapshot instance, each holds a DB instance handle inside
// TODO: [SNAPSHOT] Consider wrapping SoftReference<> around IOmMetadataReader
private final ConcurrentHashMap<String, ReferenceCounted<OmSnapshot>> dbMap;
private final ConcurrentHashMap<UUID, ReferenceCounted<OmSnapshot>> dbMap;

private final CacheLoader<UUID, OmSnapshot> cacheLoader;

private final CacheLoader<String, OmSnapshot> cacheLoader;
// Soft-limit of the total number of snapshot DB instances allowed to be
// opened on the OM.
private final int cacheSizeLimit;

public SnapshotCache(
CacheLoader<String, OmSnapshot> cacheLoader,
int cacheSizeLimit) {
public SnapshotCache(CacheLoader<UUID, OmSnapshot> cacheLoader, int cacheSizeLimit) {
this.dbMap = new ConcurrentHashMap<>();
this.cacheLoader = cacheLoader;
this.cacheSizeLimit = cacheSizeLimit;
}

@VisibleForTesting
ConcurrentHashMap<String, ReferenceCounted<OmSnapshot>> getDbMap() {
ConcurrentHashMap<UUID, ReferenceCounted<OmSnapshot>> getDbMap() {
return dbMap;
}

Expand All @@ -71,17 +71,17 @@ public int size() {

/**
* Immediately invalidate an entry.
* @param key DB snapshot table key
* @param key SnapshotId
*/
public void invalidate(String key) throws IOException {
public void invalidate(UUID key) throws IOException {
dbMap.compute(key, (k, v) -> {
if (v == null) {
LOG.warn("Key: '{}' does not exist in cache.", k);
LOG.warn("SnapshotId: '{}' does not exist in snapshot cache.", k);
} else {
try {
v.get().close();
} catch (IOException e) {
throw new IllegalStateException("Failed to close snapshot: " + key, e);
throw new IllegalStateException("Failed to close snapshotId: " + key, e);
}
}
return null;
Expand All @@ -92,11 +92,10 @@ public void invalidate(String key) throws IOException {
* Immediately invalidate all entries and close their DB instances in cache.
*/
public void invalidateAll() {
Iterator<Map.Entry<String, ReferenceCounted<OmSnapshot>>>
it = dbMap.entrySet().iterator();
Iterator<Map.Entry<UUID, ReferenceCounted<OmSnapshot>>> it = dbMap.entrySet().iterator();

while (it.hasNext()) {
Map.Entry<String, ReferenceCounted<OmSnapshot>> entry = it.next();
Map.Entry<UUID, ReferenceCounted<OmSnapshot>> entry = it.next();
OmSnapshot omSnapshot = entry.getValue().get();
try {
// TODO: If wrapped with SoftReference<>, omSnapshot could be null?
Expand All @@ -114,19 +113,18 @@ public void invalidateAll() {
*/
public enum Reason {
FS_API_READ,
SNAPDIFF_READ,
SNAP_DIFF_READ,
DEEP_CLEAN_WRITE,
GARBAGE_COLLECTION_WRITE
}

/**
* Get or load OmSnapshot. Shall be close()d after use.
* TODO: [SNAPSHOT] Can add reason enum to param list later.
* @param key snapshot table key
* @param key SnapshotId
* @return an OmSnapshot instance, or null on error
*/
public ReferenceCounted<OmSnapshot> get(String key)
throws IOException {
public ReferenceCounted<OmSnapshot> get(UUID key) throws IOException {
// Warn if actual cache size exceeds the soft limit already.
if (size() > cacheSizeLimit) {
LOG.warn("Snapshot cache size ({}) exceeds configured soft-limit ({}).",
Expand All @@ -137,9 +135,9 @@ public ReferenceCounted<OmSnapshot> get(String key)
ReferenceCounted<OmSnapshot> rcOmSnapshot =
dbMap.compute(key, (k, v) -> {
if (v == null) {
LOG.info("Loading snapshot. Table key: {}", k);
LOG.info("Loading SnapshotId: '{}'", k);
try {
v = new ReferenceCounted<>(cacheLoader.load(k), false, this);
v = new ReferenceCounted<>(cacheLoader.load(key), false, this);
} catch (OMException omEx) {
// Return null if the snapshot is no longer active
if (!omEx.getResult().equals(FILE_NOT_FOUND)) {
Expand All @@ -163,8 +161,7 @@ public ReferenceCounted<OmSnapshot> get(String key)
if (rcOmSnapshot == null) {
// The only exception that would fall through the loader logic above
// is OMException with FILE_NOT_FOUND.
throw new OMException("Snapshot table key '" + key + "' not found, "
+ "or the snapshot is no longer active",
throw new OMException("SnapshotId: '" + key + "' not found, or the snapshot is no longer active.",
OMException.ResultCodes.FILE_NOT_FOUND);
}

Expand All @@ -179,12 +176,12 @@ public ReferenceCounted<OmSnapshot> get(String key)

/**
* Release the reference count on the OmSnapshot instance.
* @param key snapshot table key
* @param key SnapshotId
*/
public void release(String key) {
public void release(UUID key) {
dbMap.compute(key, (k, v) -> {
if (v == null) {
throw new IllegalArgumentException("Key '" + key + "' does not exist in cache.");
throw new IllegalArgumentException("SnapshotId '" + key + "' does not exist in cache.");
} else {
v.decrementRefCount();
}
Expand All @@ -196,15 +193,6 @@ public void release(String key) {
cleanup();
}

/**
* Alternatively, can release with OmSnapshot instance directly.
* @param omSnapshot OmSnapshot
*/
public void release(OmSnapshot omSnapshot) {
final String snapshotTableKey = omSnapshot.getSnapshotTableKey();
release(snapshotTableKey);
}

/**
* Wrapper for cleanupInternal() that is synchronized to prevent multiple
* threads from interleaving into the cleanup method.
Expand All @@ -221,24 +209,23 @@ private synchronized void cleanup() {
* TODO: [SNAPSHOT] Add new ozone debug CLI command to trigger this directly.
*/
private void cleanupInternal() {
for (Map.Entry<String, ReferenceCounted<OmSnapshot>> entry : dbMap.entrySet()) {
for (Map.Entry<UUID, ReferenceCounted<OmSnapshot>> entry : dbMap.entrySet()) {
dbMap.compute(entry.getKey(), (k, v) -> {
if (v == null) {
throw new IllegalStateException("Key '" + k + "' does not exist in cache. The RocksDB " +
throw new IllegalStateException("SnapshotId '" + k + "' does not exist in cache. The RocksDB " +
"instance of the Snapshot may not be closed properly.");
}

if (v.getTotalRefCount() > 0) {
LOG.debug("Snapshot {} is still being referenced ({}), skipping its clean up",
k, v.getTotalRefCount());
LOG.debug("SnapshotId {} is still being referenced ({}), skipping its clean up.", k, v.getTotalRefCount());
return v;
} else {
LOG.debug("Closing Snapshot {}. It is not being referenced anymore.", k);
LOG.debug("Closing SnapshotId {}. It is not being referenced anymore.", k);
// Close the instance, which also closes its DB handle.
try {
v.get().close();
} catch (IOException ex) {
throw new IllegalStateException("Error while closing snapshot DB", ex);
throw new IllegalStateException("Error while closing snapshot DB.", ex);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,16 @@ public void testCloseOnEviction() throws IOException {

SnapshotInfo first = createSnapshotInfo(volumeName, bucketName);
SnapshotInfo second = createSnapshotInfo(volumeName, bucketName);
first.setGlobalPreviousSnapshotId(null);
first.setPathPreviousSnapshotId(null);
second.setGlobalPreviousSnapshotId(first.getSnapshotId());
second.setPathPreviousSnapshotId(first.getSnapshotId());

when(snapshotInfoTable.get(first.getTableKey())).thenReturn(first);
when(snapshotInfoTable.get(second.getTableKey())).thenReturn(second);

((OmMetadataManagerImpl) om.getMetadataManager()).getSnapshotChainManager().addSnapshot(first);
((OmMetadataManagerImpl) om.getMetadataManager()).getSnapshotChainManager().addSnapshot(second);
// create the first snapshot checkpoint
OmSnapshotManager.createOmSnapshotCheckpoint(om.getMetadataManager(),
first);
Expand Down
Loading

0 comments on commit c35e99f

Please sign in to comment.