Skip to content

Commit

Permalink
Add Changes in Snapshot Clone Flow for remote store interoperability. (
Browse files Browse the repository at this point in the history
…opensearch-project#7496)

Signed-off-by: Bansi Kasundra <kasundra@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
kasundra07 authored and shiv0408 committed Apr 25, 2024
1 parent d94edb4 commit b2a94f4
Show file tree
Hide file tree
Showing 15 changed files with 530 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.SnapshotsInProgress;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryData;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.ArrayList;
Expand All @@ -64,6 +68,7 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.nio.file.Path;
import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -153,6 +158,162 @@ public void testCloneSnapshotIndex() throws Exception {
assertEquals(status1.getStats().getTotalSize(), status2.getStats().getTotalSize());
}

public void testCloneShallowSnapshotIndex() throws Exception {
disableRepoConsistencyCheck("This test uses remote store repository");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "fs", snapshotRepoPath);

final String shallowSnapshotRepoName = "shallow-snapshot-repo-name";
final Path shallowSnapshotRepoPath = randomRepoPath();
createRepository(shallowSnapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy(shallowSnapshotRepoPath));

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String indexName = "index-1";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

final String snapshot = "snapshot";
createFullSnapshot(snapshotRepoName, snapshot);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0);

indexRandomDocs(indexName, randomIntBetween(20, 100));

final String shallowSnapshot = "shallow-snapshot";
createFullSnapshot(shallowSnapshotRepoName, shallowSnapshot);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 1);

if (randomBoolean()) {
assertAcked(admin().indices().prepareDelete(indexName));
}

final String sourceSnapshot = shallowSnapshot;
final String targetSnapshot = "target-snapshot";
assertAcked(startClone(shallowSnapshotRepoName, sourceSnapshot, targetSnapshot, indexName, remoteStoreEnabledIndexName).get());
logger.info("Lock files count: {}", getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 2);
}

public void testShallowCloneNameAvailability() throws Exception {
disableRepoConsistencyCheck("This test uses remote store repository");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
internalCluster().startDataOnlyNode();

final String shallowSnapshotRepoName = "shallow-snapshot-repo-name";
final Path shallowSnapshotRepoPath = randomRepoPath();
createRepository(shallowSnapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy(shallowSnapshotRepoPath));

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String indexName = "index-1";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

final String shallowSnapshot1 = "snapshot1";
createFullSnapshot(shallowSnapshotRepoName, shallowSnapshot1);

final String shallowSnapshot2 = "snapshot2";
createFullSnapshot(shallowSnapshotRepoName, shallowSnapshot2);

ExecutionException ex = expectThrows(
ExecutionException.class,
() -> startClone(shallowSnapshotRepoName, shallowSnapshot1, shallowSnapshot2, indexName, remoteStoreEnabledIndexName).get()
);
assertThat(ex.getMessage(), containsString("snapshot with the same name already exists"));
}

public void testCloneAfterRepoShallowSettingEnabled() throws Exception {
disableRepoConsistencyCheck("This test uses remote store repository");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "fs", snapshotRepoPath);

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String indexName = "index-1";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

final String snapshot = "snapshot";
createFullSnapshot(snapshotRepoName, snapshot);
assertEquals(getSnapshot(snapshotRepoName, snapshot).state(), SnapshotState.SUCCESS);

// Updating the snapshot repository flag to enable shallow snapshots
createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));
RepositoryMetadata updatedRepositoryMetadata = clusterAdmin().prepareGetRepositories(snapshotRepoName).get().repositories().get(0);
assertTrue(updatedRepositoryMetadata.settings().getAsBoolean(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), false));

final String targetSnapshot = "target-snapshot";
assertAcked(startClone(snapshotRepoName, snapshot, targetSnapshot, indexName, remoteStoreEnabledIndexName).get());
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0);
assertEquals(getSnapshot(snapshotRepoName, targetSnapshot).isRemoteStoreIndexShallowCopyEnabled(), false);
}

public void testCloneAfterRepoShallowSettingDisabled() throws Exception {
disableRepoConsistencyCheck("This test uses remote store repository");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String indexName = "index-1";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

final String snapshot = "snapshot";
createFullSnapshot(snapshotRepoName, snapshot);
assertEquals(getSnapshot(snapshotRepoName, snapshot).state(), SnapshotState.SUCCESS);

// Updating the snapshot repository flag to enable shallow snapshots
createRepository(snapshotRepoName, "fs", snapshotRepoPath);
RepositoryMetadata updatedRepositoryMetadata = clusterAdmin().prepareGetRepositories(snapshotRepoName).get().repositories().get(0);
assertFalse(updatedRepositoryMetadata.settings().getAsBoolean(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), false));

final String targetSnapshot = "target-snapshot";
assertAcked(startClone(snapshotRepoName, snapshot, targetSnapshot, indexName, remoteStoreEnabledIndexName).get());
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 2);
assertEquals(getSnapshot(snapshotRepoName, targetSnapshot).isRemoteStoreIndexShallowCopyEnabled(), true);
}

public void testClonePreventsSnapshotDelete() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public static Entry startClone(
version,
source,
Map.of(),
false // TODO: need to pull this value from the original snapshot, use whatever we set during snapshot create.
false // initialising to false, will be updated in startCloning method of SnapshotsService while updating entry with
// clone jobs
);
}

Expand Down Expand Up @@ -453,6 +454,26 @@ public Entry withClones(final Map<RepositoryShardId, ShardSnapshotStatus> update
);
}

public Entry withRemoteStoreIndexShallowCopy(final boolean remoteStoreIndexShallowCopy) {
return new Entry(
snapshot,
includeGlobalState,
partial,
state,
indices,
dataStreams,
startTime,
repositoryStateId,
shards,
failure,
userMetadata,
version,
source,
clones,
remoteStoreIndexShallowCopy
);
}

/**
* Create a new instance by aborting this instance. Moving all in-progress shards to {@link ShardState#ABORTED} if assigned to a
* data node or to {@link ShardState#FAILED} if not assigned to any data node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,4 +408,29 @@ private void verifyParameters(
throw new IllegalArgumentException(exceptionStr);
}
}

/**
* Creates a new instance which has a different name and zero incremental file counts but is identical to this instance in terms of the files
* it references.
*
* @param targetSnapshotName target snapshot name
* @param startTime time the clone operation on the repository was started
* @param time time it took to create the clone
*/
public RemoteStoreShardShallowCopySnapshot asClone(String targetSnapshotName, long startTime, long time) {
return new RemoteStoreShardShallowCopySnapshot(
targetSnapshotName,
indexVersion,
primaryTerm,
commitGeneration,
startTime,
time,
totalFileCount,
totalSize,
indexUUID,
remoteStoreRepository,
repositoryBasePath,
fileNames
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.store.lockmanager;

import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -50,13 +51,21 @@ String getLockPrefix() {
return fileToLock + RemoteStoreLockManagerUtils.SEPARATOR;
}

List<String> getLocksForAcquirer(String[] lockFiles) {
String getLockForAcquirer(String[] lockFiles) throws NoSuchFileException {
if (acquirerId == null || acquirerId.isBlank()) {
throw new IllegalArgumentException("Acquirer ID should be provided");
}
return Arrays.stream(lockFiles)
List<String> locksForAcquirer = Arrays.stream(lockFiles)
.filter(lockFile -> acquirerId.equals(LockFileUtils.getAcquirerIdFromLock(lockFile)))
.collect(Collectors.toList());

if (locksForAcquirer.isEmpty()) {
throw new NoSuchFileException("No lock file found for the acquirer: " + acquirerId);
}
if (locksForAcquirer.size() != 1) {
throw new IllegalStateException("Expected single lock file but found [" + locksForAcquirer.size() + "] lock files");
}
return locksForAcquirer.get(0);
}

public static LockInfoBuilder getLockInfoBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ public interface RemoteStoreLockManager {
*/
Boolean isAcquired(LockInfo lockInfo) throws IOException;

/**
* Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo.
* There can occur a race condition where the original file is deleted before we can use it to acquire lock for the new acquirer. Until we have a
* fix on LockManager side, Implementors must ensure thread safety for this operation.
* @param originalLockInfo lock info instance for original lock.
* @param clonedLockInfo lock info instance for which lock needs to be cloned.
* @throws IOException throws IOException if originalResource itself do not have any lock.
*/
void cloneLock(LockInfo originalLockInfo, LockInfo clonedLockInfo) throws IOException;

/*
Deletes all lock related files and directories
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
import org.opensearch.index.store.RemoteBufferedOutputDirectory;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;

/**
* A Class that implements Remote Store Lock Manager by creating lock files for the remote store files that needs to
Expand Down Expand Up @@ -48,6 +49,7 @@ public void acquire(LockInfo lockInfo) throws IOException {

/**
* Releases Locks acquired by a given acquirer which is passed in LockInfo Instance.
* If the lock file doesn't exist for the acquirer, release will be a no-op.
* Right now this method is only used to release locks for a given acquirer,
* This can be extended in future to handle other cases as well, like:
* - release lock for given fileToLock and AcquirerId
Expand All @@ -59,15 +61,12 @@ public void acquire(LockInfo lockInfo) throws IOException {
public void release(LockInfo lockInfo) throws IOException {
assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo";
String[] lockFiles = lockDirectory.listAll();

// ideally there should be only one lock per acquirer, but just to handle any stale locks,
// we try to release all the locks for the acquirer.
List<String> locksToRelease = ((FileLockInfo) lockInfo).getLocksForAcquirer(lockFiles);
if (locksToRelease.size() > 1) {
logger.warn(locksToRelease.size() + " locks found for acquirer " + ((FileLockInfo) lockInfo).getAcquirerId());
}
for (String lock : locksToRelease) {
lockDirectory.deleteFile(lock);
try {
String lockToRelease = ((FileLockInfo) lockInfo).getLockForAcquirer(lockFiles);
lockDirectory.deleteFile(lockToRelease);
} catch (NoSuchFileException e) {
// Ignoring if the file to be deleted is not present.
logger.info("No lock file found for acquirerId: {}", ((FileLockInfo) lockInfo).getAcquirerId());
}
}

Expand All @@ -84,6 +83,27 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException {
return !lockFiles.isEmpty();
}

/**
* Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo.
* Snapshot layer enforces thread safety by having checks in place to ensure that the source snapshot is not being deleted before proceeding
* with the clone operation. Hence, the original lock file would always be present while acquiring the lock for cloned snapshot.
* @param originalLockInfo lock info instance for original lock.
* @param clonedLockInfo lock info instance for which lock needs to be cloned.
* @throws IOException throws IOException if originalResource itself do not have any lock.
*/
@Override
public void cloneLock(LockInfo originalLockInfo, LockInfo clonedLockInfo) throws IOException {
assert originalLockInfo instanceof FileLockInfo : "originalLockInfo should be instance of FileLockInfo";
assert clonedLockInfo instanceof FileLockInfo : "clonedLockInfo should be instance of FileLockInfo";
String originalResourceId = Objects.requireNonNull(((FileLockInfo) originalLockInfo).getAcquirerId());
String clonedResourceId = Objects.requireNonNull(((FileLockInfo) clonedLockInfo).getAcquirerId());
assert originalResourceId != null && clonedResourceId != null : "provided resourceIds should not be null";
String[] lockFiles = lockDirectory.listAll();
String lockNameForAcquirer = ((FileLockInfo) originalLockInfo).getLockForAcquirer(lockFiles);
String fileToLockName = FileLockInfo.LockFileUtils.getFileToLockNameFromLock(lockNameForAcquirer);
acquire(FileLockInfo.getLockInfoBuilder().withFileToLock(fileToLockName).withAcquirerId(clonedResourceId).build());
}

public void delete() throws IOException {
lockDirectory.delete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;
Expand Down Expand Up @@ -248,6 +249,18 @@ public void executeConsistentStateUpdate(
in.executeConsistentStateUpdate(createUpdateTask, source, onFailure);
}

@Override
public void cloneRemoteStoreIndexShardSnapshot(
SnapshotId source,
SnapshotId target,
RepositoryShardId shardId,
String shardGeneration,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<String> listener
) {
in.cloneRemoteStoreIndexShardSnapshot(source, target, shardId, shardGeneration, remoteStoreLockManagerFactory, listener);
}

@Override
public void cloneShardSnapshot(
SnapshotId source,
Expand Down
Loading

0 comments on commit b2a94f4

Please sign in to comment.