Skip to content

Commit

Permalink
HDDS-11440. Add a lastTransactionInfo field in SnapshotInfo to check …
Browse files Browse the repository at this point in the history
…for transactions in flight on the snapshot (apache#7179)
  • Loading branch information
swamirishi authored Sep 12, 2024
1 parent e573701 commit d221065
Show file tree
Hide file tree
Showing 19 changed files with 351 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Objects;

import com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.StringCodec;
Expand Down Expand Up @@ -162,7 +163,15 @@ public String toString() {
*/
public static TransactionInfo readTransactionInfo(
DBStoreHAManager metadataManager) throws IOException {
return metadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
return metadataManager.getTransactionInfoTable().getSkipCache(TRANSACTION_INFO_KEY);
}

public ByteString toByteString() throws IOException {
return ByteString.copyFrom(getCodec().toPersistedFormat(this));
}

public static TransactionInfo fromByteString(ByteString byteString) throws IOException {
return byteString == null ? null : getCodec().fromPersistedFormat(byteString.toByteArray());
}

public SnapshotInfo toSnapshotInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.CopyObject;
Expand Down Expand Up @@ -124,6 +125,7 @@ public static SnapshotStatus valueOf(SnapshotStatusProto status) {
private long exclusiveSize;
private long exclusiveReplicatedSize;
private boolean deepCleanedDeletedDir;
private ByteString lastTransactionInfo;

private SnapshotInfo(Builder b) {
this.snapshotId = b.snapshotId;
Expand All @@ -145,6 +147,7 @@ private SnapshotInfo(Builder b) {
this.exclusiveSize = b.exclusiveSize;
this.exclusiveReplicatedSize = b.exclusiveReplicatedSize;
this.deepCleanedDeletedDir = b.deepCleanedDeletedDir;
this.lastTransactionInfo = b.lastTransactionInfo;
}

public void setName(String name) {
Expand Down Expand Up @@ -261,13 +264,15 @@ public SnapshotInfo.Builder toBuilder() {
.setGlobalPreviousSnapshotId(globalPreviousSnapshotId)
.setSnapshotPath(snapshotPath)
.setCheckpointDir(checkpointDir)
.setDbTxSequenceNumber(dbTxSequenceNumber)
.setDeepClean(deepClean)
.setSstFiltered(sstFiltered)
.setReferencedSize(referencedSize)
.setReferencedReplicatedSize(referencedReplicatedSize)
.setExclusiveSize(exclusiveSize)
.setExclusiveReplicatedSize(exclusiveReplicatedSize)
.setDeepCleanedDeletedDir(deepCleanedDeletedDir);
.setDeepCleanedDeletedDir(deepCleanedDeletedDir)
.setLastTransactionInfo(lastTransactionInfo);
}

/**
Expand All @@ -293,6 +298,7 @@ public static class Builder {
private long exclusiveSize;
private long exclusiveReplicatedSize;
private boolean deepCleanedDeletedDir;
private ByteString lastTransactionInfo;

public Builder() {
// default values
Expand Down Expand Up @@ -411,6 +417,11 @@ public Builder setDeepCleanedDeletedDir(boolean deepCleanedDeletedDir) {
return this;
}

public Builder setLastTransactionInfo(ByteString lastTransactionInfo) {
this.lastTransactionInfo = lastTransactionInfo;
return this;
}

public SnapshotInfo build() {
Preconditions.checkNotNull(name);
return new SnapshotInfo(this);
Expand Down Expand Up @@ -445,6 +456,10 @@ public OzoneManagerProtocolProtos.SnapshotInfo getProtobuf() {
sib.setGlobalPreviousSnapshotID(toProtobuf(globalPreviousSnapshotId));
}

if (lastTransactionInfo != null) {
sib.setLastTransactionInfo(lastTransactionInfo);
}

sib.setSnapshotPath(snapshotPath)
.setCheckpointDir(checkpointDir)
.setDbTxSequenceNumber(dbTxSequenceNumber)
Expand Down Expand Up @@ -513,6 +528,10 @@ public static SnapshotInfo getFromProtobuf(
snapshotInfoProto.getDeepCleanedDeletedDir());
}

if (snapshotInfoProto.hasLastTransactionInfo()) {
osib.setLastTransactionInfo(snapshotInfoProto.getLastTransactionInfo());
}

osib.setSnapshotPath(snapshotInfoProto.getSnapshotPath())
.setCheckpointDir(snapshotInfoProto.getCheckpointDir())
.setDbTxSequenceNumber(snapshotInfoProto.getDbTxSequenceNumber());
Expand Down Expand Up @@ -605,6 +624,14 @@ public void setDeepCleanedDeletedDir(boolean deepCleanedDeletedDir) {
this.deepCleanedDeletedDir = deepCleanedDeletedDir;
}

public ByteString getLastTransactionInfo() {
return lastTransactionInfo;
}

public void setLastTransactionInfo(ByteString lastTransactionInfo) {
this.lastTransactionInfo = lastTransactionInfo;
}

/**
* Generate default name of snapshot, (used if user doesn't provide one).
*/
Expand Down Expand Up @@ -673,7 +700,8 @@ public boolean equals(Object o) {
referencedReplicatedSize == that.referencedReplicatedSize &&
exclusiveSize == that.exclusiveSize &&
exclusiveReplicatedSize == that.exclusiveReplicatedSize &&
deepCleanedDeletedDir == that.deepCleanedDeletedDir;
deepCleanedDeletedDir == that.deepCleanedDeletedDir &&
Objects.equals(lastTransactionInfo, that.lastTransactionInfo);
}

@Override
Expand All @@ -684,35 +712,15 @@ public int hashCode() {
globalPreviousSnapshotId, snapshotPath, checkpointDir,
deepClean, sstFiltered,
referencedSize, referencedReplicatedSize,
exclusiveSize, exclusiveReplicatedSize, deepCleanedDeletedDir);
exclusiveSize, exclusiveReplicatedSize, deepCleanedDeletedDir, lastTransactionInfo);
}

/**
* Return a new copy of the object.
*/
@Override
public SnapshotInfo copyObject() {
return new Builder()
.setSnapshotId(snapshotId)
.setName(name)
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setSnapshotStatus(snapshotStatus)
.setCreationTime(creationTime)
.setDeletionTime(deletionTime)
.setPathPreviousSnapshotId(pathPreviousSnapshotId)
.setGlobalPreviousSnapshotId(globalPreviousSnapshotId)
.setSnapshotPath(snapshotPath)
.setCheckpointDir(checkpointDir)
.setDbTxSequenceNumber(dbTxSequenceNumber)
.setDeepClean(deepClean)
.setSstFiltered(sstFiltered)
.setReferencedSize(referencedSize)
.setReferencedReplicatedSize(referencedReplicatedSize)
.setExclusiveSize(exclusiveSize)
.setExclusiveReplicatedSize(exclusiveReplicatedSize)
.setDeepCleanedDeletedDir(deepCleanedDeletedDir)
.build();
return this.toBuilder().build();
}

@Override
Expand All @@ -737,6 +745,7 @@ public String toString() {
", exclusiveSize: '" + exclusiveSize + '\'' +
", exclusiveReplicatedSize: '" + exclusiveReplicatedSize + '\'' +
", deepCleanedDeletedDir: '" + deepCleanedDeletedDir + '\'' +
", lastTransactionInfo: '" + lastTransactionInfo + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,7 @@ message SnapshotInfo {
optional uint64 exclusiveReplicatedSize = 18;
// note: shared sizes can be calculated from: referenced - exclusive
optional bool deepCleanedDeletedDir = 19;
optional bytes lastTransactionInfo = 20;
}

message SnapshotDiffJobProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.CodecRegistry;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
Expand Down Expand Up @@ -674,6 +675,41 @@ private ReferenceCounted<OmSnapshot> getSnapshot(String snapshotTableKey, boolea
return snapshotCache.get(snapshotInfo.getSnapshotId());
}

/**
* Checks if the last transaction performed on the snapshot has been flushed to disk.
* @param metadataManager Metadatamanager of Active OM.
* @param snapshotTableKey table key corresponding to snapshot in snapshotInfoTable.
* @return True if the changes have been flushed to DB otherwise false
* @throws IOException
*/
public static boolean areSnapshotChangesFlushedToDB(OMMetadataManager metadataManager, String snapshotTableKey)
throws IOException {
// Need this info from cache since the snapshot could have been updated only on cache and not on disk.
SnapshotInfo snapshotInfo = metadataManager.getSnapshotInfoTable().get(snapshotTableKey);
return areSnapshotChangesFlushedToDB(metadataManager, snapshotInfo);
}

/**
* Checks if the last transaction performed on the snapshot has been flushed to disk.
* @param metadataManager Metadatamanager of Active OM.
* @param snapshotInfo SnapshotInfo value.
* @return True if the changes have been flushed to DB otherwise false. It would return true if the snapshot
* provided is null meaning the snapshot doesn't exist.
* @throws IOException
*/
public static boolean areSnapshotChangesFlushedToDB(OMMetadataManager metadataManager, SnapshotInfo snapshotInfo)
throws IOException {
if (snapshotInfo != null) {
TransactionInfo snapshotTransactionInfo = snapshotInfo.getLastTransactionInfo() != null ?
TransactionInfo.fromByteString(snapshotInfo.getLastTransactionInfo()) : null;
TransactionInfo omTransactionInfo = TransactionInfo.readTransactionInfo(metadataManager);
// If transactionInfo field is null then return true to keep things backward compatible.
return snapshotTransactionInfo == null || omTransactionInfo.compareTo(snapshotTransactionInfo) >= 0;
}
return true;
}


/**
* Returns OmSnapshot object and skips active check.
* This should only be used for API calls initiated by background service e.g. purgeKeys, purgeSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.OzoneConsts;
Expand Down Expand Up @@ -149,6 +150,11 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
}
}
}
if (fromSnapshotInfo != null) {
fromSnapshotInfo.setLastTransactionInfo(TransactionInfo.valueOf(termIndex).toByteString());
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(fromSnapshotInfo.getTableKey()),
CacheValue.get(termIndex.getIndex(), fromSnapshotInfo));
}
} catch (IOException ex) {
// Case of IOException for fromProtobuf will not happen
// as this is created and send within OM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.ozone.om.OzoneManager;
Expand Down Expand Up @@ -61,6 +65,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
String fromSnapshot = purgeKeysRequest.hasSnapshotTableKey() ?
purgeKeysRequest.getSnapshotTableKey() : null;
List<String> keysToBePurgedList = new ArrayList<>();
OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) ozoneManager.getMetadataManager();

OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
getOmRequest());
Expand All @@ -71,17 +76,27 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
keysToBePurgedList.add(deletedKey);
}
}
final SnapshotInfo fromSnapshotInfo;

try {
SnapshotInfo fromSnapshotInfo = null;
if (fromSnapshot != null) {
fromSnapshotInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, fromSnapshot);
}
omClientResponse = new OMKeyPurgeResponse(omResponse.build(),
keysToBePurgedList, fromSnapshotInfo, keysToUpdateList);
fromSnapshotInfo = fromSnapshot == null ? null : SnapshotUtils.getSnapshotInfo(ozoneManager, fromSnapshot);
} catch (IOException ex) {
omClientResponse = new OMKeyPurgeResponse(createErrorOMResponse(omResponse, ex));
return new OMKeyPurgeResponse(createErrorOMResponse(omResponse, ex));
}

// Setting transaction info for snapshot, this is to prevent duplicate purge requests to OM from background
// services.
try {
if (fromSnapshotInfo != null) {
fromSnapshotInfo.setLastTransactionInfo(TransactionInfo.valueOf(termIndex).toByteString());
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(fromSnapshotInfo.getTableKey()),
CacheValue.get(termIndex.getIndex(), fromSnapshotInfo));
}
} catch (IOException e) {
return new OMKeyPurgeResponse(createErrorOMResponse(omResponse, e));
}
omClientResponse = new OMKeyPurgeResponse(omResponse.build(), keysToBePurgedList, fromSnapshotInfo,
keysToUpdateList);

return omClientResponse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
Expand Down Expand Up @@ -166,7 +167,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
((RDBStore) omMetadataManager.getStore()).getDb()
.getLatestSequenceNumber();
snapshotInfo.setDbTxSequenceNumber(dbLatestSequenceNumber);

snapshotInfo.setLastTransactionInfo(TransactionInfo.valueOf(termIndex).toByteString());
// Snapshot referenced size should be bucket's used bytes
OmBucketInfo omBucketInfo =
getBucketInfo(omMetadataManager, volumeName, bucketName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
package org.apache.hadoop.ozone.om.request.snapshot;

import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
Expand Down Expand Up @@ -82,15 +85,20 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
nextSnapshot = SnapshotUtils.getNextActiveSnapshot(fromSnapshot, snapshotChainManager, ozoneManager);

// Get next non-deleted snapshot.
List<SnapshotMoveKeyInfos> nextDBKeysList =
moveDeletedKeysRequest.getNextDBKeysList();
List<SnapshotMoveKeyInfos> reclaimKeysList =
moveDeletedKeysRequest.getReclaimKeysList();
List<HddsProtos.KeyValue> renamedKeysList =
moveDeletedKeysRequest.getRenamedKeysList();
List<String> movedDirs =
moveDeletedKeysRequest.getDeletedDirsToMoveList();

List<SnapshotMoveKeyInfos> nextDBKeysList = moveDeletedKeysRequest.getNextDBKeysList();
List<SnapshotMoveKeyInfos> reclaimKeysList = moveDeletedKeysRequest.getReclaimKeysList();
List<HddsProtos.KeyValue> renamedKeysList = moveDeletedKeysRequest.getRenamedKeysList();
List<String> movedDirs = moveDeletedKeysRequest.getDeletedDirsToMoveList();

// Update lastTransactionInfo for fromSnapshot and the nextSnapshot.
fromSnapshot.setLastTransactionInfo(TransactionInfo.valueOf(termIndex).toByteString());
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()),
CacheValue.get(termIndex.getIndex(), fromSnapshot));
if (nextSnapshot != null) {
nextSnapshot.setLastTransactionInfo(TransactionInfo.valueOf(termIndex).toByteString());
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(nextSnapshot.getTableKey()),
CacheValue.get(termIndex.getIndex(), nextSnapshot));
}
omClientResponse = new OMSnapshotMoveDeletedKeysResponse(
omResponse.build(), fromSnapshot, nextSnapshot,
nextDBKeysList, reclaimKeysList, renamedKeysList, movedDirs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hadoop.ozone.om.request.snapshot;

import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.ratis.server.protocol.TermIndex;
Expand Down Expand Up @@ -110,9 +111,16 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager, trxnLogIndex);
// Step 2: Update the snapshot chain.
updateSnapshotChainAndCache(omMetadataManager, fromSnapshot, trxnLogIndex);
// Step 3: Purge the snapshot from SnapshotInfoTable cache.
// Step 3: Purge the snapshot from SnapshotInfoTable cache and also remove from the map.
omMetadataManager.getSnapshotInfoTable()
.addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()), CacheValue.get(trxnLogIndex));
updatedSnapshotInfos.remove(fromSnapshot.getTableKey());
}

for (SnapshotInfo snapshotInfo : updatedSnapshotInfos.values()) {
snapshotInfo.setLastTransactionInfo(TransactionInfo.valueOf(termIndex).toByteString());
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(snapshotInfo.getTableKey()),
CacheValue.get(termIndex.getIndex(), snapshotInfo));
}

omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(), snapshotDbKeys, updatedSnapshotInfos);
Expand Down
Loading

0 comments on commit d221065

Please sign in to comment.