Skip to content

Commit

Permalink
HDDS-11408. Snapshot rename table entries are propagated incorrectly …
Browse files Browse the repository at this point in the history
…on snapshot deletes (apache#7200)
  • Loading branch information
swamirishi authored and sarvekshayr committed Oct 7, 2024
1 parent 84f3f6f commit 8b9e4de
Show file tree
Hide file tree
Showing 30 changed files with 2,237 additions and 1,059 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
Expand Down Expand Up @@ -354,6 +355,24 @@ public V getValue() {
public String toString() {
return "(key=" + key + ", value=" + value + ")";
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof KeyValue)) {
return false;
}
KeyValue<?, ?> kv = (KeyValue<?, ?>) obj;
try {
return getKey().equals(kv.getKey()) && getValue().equals(kv.getValue());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public int hashCode() {
return Objects.hash(getKey(), getValue());
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ public static boolean isReadOnly(
case DeleteSnapshot:
case RenameSnapshot:
case SnapshotMoveDeletedKeys:
case SnapshotMoveTableKeys:
case SnapshotPurge:
case RecoverLease:
case SetTimes:
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ enum Type {
GetServerDefaults = 134;
GetQuotaRepairStatus = 135;
StartQuotaRepair = 136;
SnapshotMoveTableKeys = 137;
}

enum SafeMode {
Expand Down Expand Up @@ -295,6 +296,7 @@ message OMRequest {
optional ServerDefaultsRequest ServerDefaultsRequest = 132;
optional GetQuotaRepairStatusRequest GetQuotaRepairStatusRequest = 133;
optional StartQuotaRepairRequest StartQuotaRepairRequest = 134;
optional SnapshotMoveTableKeysRequest SnapshotMoveTableKeysRequest = 135;
}

message OMResponse {
Expand Down Expand Up @@ -1981,6 +1983,13 @@ message SnapshotMoveDeletedKeysRequest {
repeated string deletedDirsToMove = 5;
}

message SnapshotMoveTableKeysRequest {
optional hadoop.hdds.UUID fromSnapshotID = 1;
repeated SnapshotMoveKeyInfos deletedKeys = 2;
repeated SnapshotMoveKeyInfos deletedDirs = 3;
repeated hadoop.hdds.KeyValue renamedKeys = 4;
}

message SnapshotMoveKeyInfos {
optional string key = 1;
repeated KeyInfo keyInfos = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,22 @@ public interface OMMetadataManager extends DBStoreHAManager {
*/
String getBucketKey(String volume, String bucket);

/**
* Given a volume and bucket, return the corresponding DB key prefix.
*
* @param volume - Volume name
* @param bucket - Bucket name
*/
String getBucketKeyPrefix(String volume, String bucket);

/**
* Given a volume and bucket, return the corresponding DB key prefix for FSO buckets.
*
* @param volume - Volume name
* @param bucket - Bucket name
*/
String getBucketKeyPrefixFSO(String volume, String bucket) throws IOException;

/**
* Given a volume, bucket and a key, return the corresponding DB key.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
Expand All @@ -28,13 +29,15 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.fs.OzoneManagerFS;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
import org.apache.hadoop.ozone.om.service.SnapshotDirectoryCleaningService;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -119,6 +122,29 @@ ListKeysResult listKeys(String volumeName, String bucketName, String startKey,
*/
PendingKeysDeletion getPendingDeletionKeys(int count) throws IOException;

/**
* Returns a list rename entries from the snapshotRenamedTable.
*
* @param size max number of keys to return.
* @return a Pair of list of {@link org.apache.hadoop.hdds.utils.db.Table.KeyValue} representing the keys in the
* underlying metadataManager.
* @throws IOException
*/
List<Table.KeyValue<String, String>> getRenamesKeyEntries(
String volume, String bucket, String startKey, int size) throws IOException;


/**
* Returns a list deleted entries from the deletedTable.
*
* @param size max number of keys to return.
* @return a Pair of list of {@link org.apache.hadoop.hdds.utils.db.Table.KeyValue} representing the keys in the
* underlying metadataManager.
* @throws IOException
*/
List<Table.KeyValue<String, List<OmKeyInfo>>> getDeletedKeyEntries(
String volume, String bucket, String startKey, int size) throws IOException;

/**
* Returns the names of up to {@code count} open keys whose age is
* greater than or equal to {@code expireThreshold}.
Expand Down Expand Up @@ -216,6 +242,26 @@ OmMultipartUploadListParts listParts(String volumeName, String bucketName,
*/
Table.KeyValue<String, OmKeyInfo> getPendingDeletionDir() throws IOException;

/**
* Returns an iterator for pending deleted directories.
* @throws IOException
*/
TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> getDeletedDirEntries(
String volume, String bucket) throws IOException;

default List<Table.KeyValue<String, OmKeyInfo>> getDeletedDirEntries(String volume, String bucket, int size)
throws IOException {
List<Table.KeyValue<String, OmKeyInfo>> deletedDirEntries = new ArrayList<>(size);
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> iterator =
getDeletedDirEntries(volume, bucket)) {
while (deletedDirEntries.size() < size && iterator.hasNext()) {
Table.KeyValue<String, OmKeyInfo> kv = iterator.next();
deletedDirEntries.add(Table.newKeyValue(kv.getKey(), kv.getValue()));
}
return deletedDirEntries;
}
}

/**
* Returns all sub directories under the given parent directory.
*
Expand Down Expand Up @@ -243,7 +289,7 @@ List<OmKeyInfo> getPendingDeletionSubFiles(long volumeId,
* Returns the instance of Directory Deleting Service.
* @return Background service.
*/
BackgroundService getDirDeletingService();
DirectoryDeletingService getDirDeletingService();

/**
* Returns the instance of Open Key Cleanup Service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Stack;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -86,6 +87,7 @@
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
Expand Down Expand Up @@ -189,7 +191,7 @@ public class KeyManagerImpl implements KeyManager {

private final KeyProviderCryptoExtension kmsProvider;
private final boolean enableFileSystemPaths;
private BackgroundService dirDeletingService;
private DirectoryDeletingService dirDeletingService;
private final OMPerformanceMetrics metrics;

private BackgroundService openKeyCleanupService;
Expand Down Expand Up @@ -305,7 +307,7 @@ public void start(OzoneConfiguration configuration) {
try {
snapshotDeletingService = new SnapshotDeletingService(
snapshotServiceInterval, snapshotServiceTimeout,
ozoneManager, scmClient.getBlockClient());
ozoneManager);
snapshotDeletingService.start();
} catch (IOException e) {
LOG.error("Error starting Snapshot Deleting Service", e);
Expand Down Expand Up @@ -662,6 +664,60 @@ public PendingKeysDeletion getPendingDeletionKeys(final int count)
.getPendingDeletionKeys(count, ozoneManager.getOmSnapshotManager());
}

private <V, R> List<Table.KeyValue<String, R>> getTableEntries(String startKey,
TableIterator<String, ? extends Table.KeyValue<String, V>> tableIterator,
Function<V, R> valueFunction, int size) throws IOException {
List<Table.KeyValue<String, R>> entries = new ArrayList<>();
/* Seek to the start key if it not null. The next key in queue is ensured to start with the bucket
prefix, {@link org.apache.hadoop.hdds.utils.db.Table#iterator(bucketPrefix)} would ensure this.
*/
if (startKey != null) {
tableIterator.seek(startKey);
tableIterator.seekToFirst();
}
int currentCount = 0;
while (tableIterator.hasNext() && currentCount < size) {
Table.KeyValue<String, V> kv = tableIterator.next();
if (kv != null) {
entries.add(Table.newKeyValue(kv.getKey(), valueFunction.apply(kv.getValue())));
currentCount++;
}
}
return entries;
}

private Optional<String> getBucketPrefix(String volumeName, String bucketName, boolean isFSO) throws IOException {
// Bucket prefix would be empty if both volume & bucket is empty i.e. either null or "".
if (StringUtils.isEmpty(volumeName) && StringUtils.isEmpty(bucketName)) {
return Optional.empty();
} else if (StringUtils.isEmpty(bucketName) || StringUtils.isEmpty(volumeName)) {
throw new IOException("One of volume : " + volumeName + ", bucket: " + bucketName + " is empty." +
" Either both should be empty or none of the arguments should be empty");
}
return isFSO ? Optional.of(metadataManager.getBucketKeyPrefixFSO(volumeName, bucketName)) :
Optional.of(metadataManager.getBucketKeyPrefix(volumeName, bucketName));
}

@Override
public List<Table.KeyValue<String, String>> getRenamesKeyEntries(
String volume, String bucket, String startKey, int size) throws IOException {
Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, false);
try (TableIterator<String, ? extends Table.KeyValue<String, String>>
renamedKeyIter = metadataManager.getSnapshotRenamedTable().iterator(bucketPrefix.orElse(""))) {
return getTableEntries(startKey, renamedKeyIter, Function.identity(), size);
}
}

@Override
public List<Table.KeyValue<String, List<OmKeyInfo>>> getDeletedKeyEntries(
String volume, String bucket, String startKey, int size) throws IOException {
Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, false);
try (TableIterator<String, ? extends Table.KeyValue<String, RepeatedOmKeyInfo>>
delKeyIter = metadataManager.getDeletedTable().iterator(bucketPrefix.orElse(""))) {
return getTableEntries(startKey, delKeyIter, RepeatedOmKeyInfo::cloneOmKeyInfoList, size);
}
}

@Override
public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
int count, BucketLayout bucketLayout, Duration leaseThreshold) throws IOException {
Expand All @@ -688,7 +744,7 @@ public KeyDeletingService getDeletingService() {
}

@Override
public BackgroundService getDirDeletingService() {
public DirectoryDeletingService getDirDeletingService() {
return dirDeletingService;
}

Expand Down Expand Up @@ -723,8 +779,7 @@ public boolean isSstFilteringSvcEnabled() {
TimeUnit.MILLISECONDS);
return serviceInterval != DISABLE_VALUE;
}



@Override
public OmMultipartUploadList listMultipartUploads(String volumeName,
String bucketName, String prefix) throws OMException {
Expand Down Expand Up @@ -1325,7 +1380,6 @@ private OmKeyInfo createFakeDirIfShould(String volume, String bucket,
return null;
}


private OzoneFileStatus getOzoneFileStatusFSO(OmKeyArgs args,
String clientAddress, boolean skipFileNotFoundError) throws IOException {
final String volumeName = args.getVolumeName();
Expand Down Expand Up @@ -1784,17 +1838,13 @@ private List<OzoneFileStatus> buildFinalStatusList(
}
fileStatusFinalList.add(fileStatus);
}

return sortPipelineInfo(fileStatusFinalList, keyInfoList,
omKeyArgs, clientAddress);
}


private List<OzoneFileStatus> sortPipelineInfo(
List<OzoneFileStatus> fileStatusFinalList, List<OmKeyInfo> keyInfoList,
OmKeyArgs omKeyArgs, String clientAddress) throws IOException {


if (omKeyArgs.getLatestVersionLocation()) {
slimLocationVersion(keyInfoList.toArray(new OmKeyInfo[0]));
}
Expand Down Expand Up @@ -1976,6 +2026,13 @@ public Table.KeyValue<String, OmKeyInfo> getPendingDeletionDir()
return null;
}

@Override
public TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> getDeletedDirEntries(
String volume, String bucket) throws IOException {
Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, true);
return metadataManager.getDeletedDirTable().iterator(bucketPrefix.orElse(""));
}

@Override
public List<OmKeyInfo> getPendingDeletionSubDirs(long volumeId, long bucketId,
OmKeyInfo parentInfo, long numEntries) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ public String getUserKey(String user) {
/**
* Given a volume and bucket, return the corresponding DB key.
*
* @param volume - User name
* @param volume - Volume name
* @param bucket - Bucket name
*/
@Override
Expand All @@ -838,6 +838,22 @@ public String getBucketKey(String volume, String bucket) {
return builder.toString();
}

/**
* {@inheritDoc}
*/
@Override
public String getBucketKeyPrefix(String volume, String bucket) {
return getOzoneKey(volume, bucket, OM_KEY_PREFIX);
}

/**
* {@inheritDoc}
*/
@Override
public String getBucketKeyPrefixFSO(String volume, String bucket) throws IOException {
return getOzoneKeyFSO(volume, bucket, OM_KEY_PREFIX);
}

@Override
public String getOzoneKey(String volume, String bucket, String key) {
StringBuilder builder = new StringBuilder()
Expand Down
Loading

0 comments on commit 8b9e4de

Please sign in to comment.