diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java index 91faa410b387f..e8d645bb99ad2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java @@ -38,11 +38,15 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.cluster.SnapshotsInProgress; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexNotFoundException; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryData; import org.opensearch.snapshots.mockstore.MockRepository; +import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.OpenSearchIntegTestCase; import java.util.ArrayList; @@ -64,6 +68,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.nio.file.Path; +import java.util.concurrent.ExecutionException; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -171,6 +176,162 @@ public void testCloneSnapshotIndex() throws Exception { assertEquals(status1.getStats().getTotalSize(), status2.getStats().getTotalSize()); } + public void testCloneShallowSnapshotIndex() throws Exception { + disableRepoConsistencyCheck("This test uses remote store repository"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "fs", snapshotRepoPath); + + final String shallowSnapshotRepoName = "shallow-snapshot-repo-name"; + final Path shallowSnapshotRepoPath = randomRepoPath(); + createRepository(shallowSnapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy(shallowSnapshotRepoPath)); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); + + final String snapshot = "snapshot"; + createFullSnapshot(snapshotRepoName, snapshot); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0); + + indexRandomDocs(indexName, randomIntBetween(20, 100)); + + final String shallowSnapshot = "shallow-snapshot"; + createFullSnapshot(shallowSnapshotRepoName, shallowSnapshot); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 1); + + if (randomBoolean()) { + assertAcked(admin().indices().prepareDelete(indexName)); + } + + final String sourceSnapshot = shallowSnapshot; + final String targetSnapshot = "target-snapshot"; + assertAcked(startClone(shallowSnapshotRepoName, sourceSnapshot, targetSnapshot, indexName, remoteStoreEnabledIndexName).get()); + logger.info("Lock files count: {}", getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 2); + } + + public void testShallowCloneNameAvailability() throws Exception { + disableRepoConsistencyCheck("This test uses remote store repository"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS); + internalCluster().startDataOnlyNode(); + + final String shallowSnapshotRepoName = "shallow-snapshot-repo-name"; + final Path shallowSnapshotRepoPath = randomRepoPath(); + createRepository(shallowSnapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy(shallowSnapshotRepoPath)); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); + + final String shallowSnapshot1 = "snapshot1"; + createFullSnapshot(shallowSnapshotRepoName, shallowSnapshot1); + + final String shallowSnapshot2 = "snapshot2"; + createFullSnapshot(shallowSnapshotRepoName, shallowSnapshot2); + + ExecutionException ex = expectThrows( + ExecutionException.class, + () -> startClone(shallowSnapshotRepoName, shallowSnapshot1, shallowSnapshot2, indexName, remoteStoreEnabledIndexName).get() + ); + assertThat(ex.getMessage(), containsString("snapshot with the same name already exists")); + } + + public void testCloneAfterRepoShallowSettingEnabled() throws Exception { + disableRepoConsistencyCheck("This test uses remote store repository"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "fs", snapshotRepoPath); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); + + final String snapshot = "snapshot"; + createFullSnapshot(snapshotRepoName, snapshot); + assertEquals(getSnapshot(snapshotRepoName, snapshot).state(), SnapshotState.SUCCESS); + + // Updating the snapshot repository flag to enable shallow snapshots + createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy(snapshotRepoPath)); + RepositoryMetadata updatedRepositoryMetadata = clusterAdmin().prepareGetRepositories(snapshotRepoName).get().repositories().get(0); + assertTrue(updatedRepositoryMetadata.settings().getAsBoolean(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), false)); + + final String targetSnapshot = "target-snapshot"; + assertAcked(startClone(snapshotRepoName, snapshot, targetSnapshot, indexName, remoteStoreEnabledIndexName).get()); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0); + assertEquals(getSnapshot(snapshotRepoName, targetSnapshot).isRemoteStoreIndexShallowCopyEnabled(), false); + } + + public void testCloneAfterRepoShallowSettingDisabled() throws Exception { + disableRepoConsistencyCheck("This test uses remote store repository"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy(snapshotRepoPath)); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); + + final String snapshot = "snapshot"; + createFullSnapshot(snapshotRepoName, snapshot); + assertEquals(getSnapshot(snapshotRepoName, snapshot).state(), SnapshotState.SUCCESS); + + // Updating the snapshot repository flag to enable shallow snapshots + createRepository(snapshotRepoName, "fs", snapshotRepoPath); + RepositoryMetadata updatedRepositoryMetadata = clusterAdmin().prepareGetRepositories(snapshotRepoName).get().repositories().get(0); + assertFalse(updatedRepositoryMetadata.settings().getAsBoolean(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), false)); + + final String targetSnapshot = "target-snapshot"; + assertAcked(startClone(snapshotRepoName, snapshot, targetSnapshot, indexName, remoteStoreEnabledIndexName).get()); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 2); + assertEquals(getSnapshot(snapshotRepoName, targetSnapshot).isRemoteStoreIndexShallowCopyEnabled(), true); + } + public void testClonePreventsSnapshotDelete() throws Exception { final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(); internalCluster().startDataOnlyNode(); diff --git a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java index 87287359b6e4c..b54dbdee20daf 100644 --- a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java @@ -174,7 +174,8 @@ public static Entry startClone( version, source, Map.of(), - false // TODO: need to pull this value from the original snapshot, use whatever we set during snapshot create. + false // initialising to false, will be updated in startCloning method of SnapshotsService while updating entry with + // clone jobs ); } @@ -482,6 +483,26 @@ public Entry withClones(final Map update ); } + public Entry withRemoteStoreIndexShallowCopy(final boolean remoteStoreIndexShallowCopy) { + return new Entry( + snapshot, + includeGlobalState, + partial, + state, + indices, + dataStreams, + startTime, + repositoryStateId, + shards, + failure, + userMetadata, + version, + source, + clones, + remoteStoreIndexShallowCopy + ); + } + /** * Create a new instance by aborting this instance. Moving all in-progress shards to {@link ShardState#ABORTED} if assigned to a * data node or to {@link ShardState#FAILED} if not assigned to any data node. diff --git a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java index 8e6ed870c904f..8cb9fd3cd3c63 100644 --- a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java +++ b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java @@ -408,4 +408,29 @@ private void verifyParameters( throw new IllegalArgumentException(exceptionStr); } } + + /** + * Creates a new instance which has a different name and zero incremental file counts but is identical to this instance in terms of the files + * it references. + * + * @param targetSnapshotName target snapshot name + * @param startTime time the clone operation on the repository was started + * @param time time it took to create the clone + */ + public RemoteStoreShardShallowCopySnapshot asClone(String targetSnapshotName, long startTime, long time) { + return new RemoteStoreShardShallowCopySnapshot( + targetSnapshotName, + indexVersion, + primaryTerm, + commitGeneration, + startTime, + time, + totalFileCount, + totalSize, + indexUUID, + remoteStoreRepository, + repositoryBasePath, + fileNames + ); + } } diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java b/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java index a8fb7bf20c393..24f42743e1a04 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java @@ -8,6 +8,7 @@ package org.opensearch.index.store.lockmanager; +import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -50,13 +51,21 @@ String getLockPrefix() { return fileToLock + RemoteStoreLockManagerUtils.SEPARATOR; } - List getLocksForAcquirer(String[] lockFiles) { + String getLockForAcquirer(String[] lockFiles) throws NoSuchFileException { if (acquirerId == null || acquirerId.isBlank()) { throw new IllegalArgumentException("Acquirer ID should be provided"); } - return Arrays.stream(lockFiles) + List locksForAcquirer = Arrays.stream(lockFiles) .filter(lockFile -> acquirerId.equals(LockFileUtils.getAcquirerIdFromLock(lockFile))) .collect(Collectors.toList()); + + if (locksForAcquirer.isEmpty()) { + throw new NoSuchFileException("No lock file found for the acquirer: " + acquirerId); + } + if (locksForAcquirer.size() != 1) { + throw new IllegalStateException("Expected single lock file but found [" + locksForAcquirer.size() + "] lock files"); + } + return locksForAcquirer.get(0); } public static LockInfoBuilder getLockInfoBuilder() { diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java index c30be082b4795..9eb066d9e955e 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java @@ -39,6 +39,16 @@ public interface RemoteStoreLockManager { */ Boolean isAcquired(LockInfo lockInfo) throws IOException; + /** + * Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo. + * There can occur a race condition where the original file is deleted before we can use it to acquire lock for the new acquirer. Until we have a + * fix on LockManager side, Implementors must ensure thread safety for this operation. + * @param originalLockInfo lock info instance for original lock. + * @param clonedLockInfo lock info instance for which lock needs to be cloned. + * @throws IOException throws IOException if originalResource itself do not have any lock. + */ + void cloneLock(LockInfo originalLockInfo, LockInfo clonedLockInfo) throws IOException; + /* Deletes all lock related files and directories */ diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java index 7df20cae10664..fd7906729e314 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java @@ -15,8 +15,9 @@ import org.opensearch.index.store.RemoteBufferedOutputDirectory; import java.io.IOException; +import java.nio.file.NoSuchFileException; import java.util.Collection; -import java.util.List; +import java.util.Objects; /** * A Class that implements Remote Store Lock Manager by creating lock files for the remote store files that needs to @@ -48,6 +49,7 @@ public void acquire(LockInfo lockInfo) throws IOException { /** * Releases Locks acquired by a given acquirer which is passed in LockInfo Instance. + * If the lock file doesn't exist for the acquirer, release will be a no-op. * Right now this method is only used to release locks for a given acquirer, * This can be extended in future to handle other cases as well, like: * - release lock for given fileToLock and AcquirerId @@ -59,15 +61,12 @@ public void acquire(LockInfo lockInfo) throws IOException { public void release(LockInfo lockInfo) throws IOException { assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo"; String[] lockFiles = lockDirectory.listAll(); - - // ideally there should be only one lock per acquirer, but just to handle any stale locks, - // we try to release all the locks for the acquirer. - List locksToRelease = ((FileLockInfo) lockInfo).getLocksForAcquirer(lockFiles); - if (locksToRelease.size() > 1) { - logger.warn(locksToRelease.size() + " locks found for acquirer " + ((FileLockInfo) lockInfo).getAcquirerId()); - } - for (String lock : locksToRelease) { - lockDirectory.deleteFile(lock); + try { + String lockToRelease = ((FileLockInfo) lockInfo).getLockForAcquirer(lockFiles); + lockDirectory.deleteFile(lockToRelease); + } catch (NoSuchFileException e) { + // Ignoring if the file to be deleted is not present. + logger.info("No lock file found for acquirerId: {}", ((FileLockInfo) lockInfo).getAcquirerId()); } } @@ -84,6 +83,27 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException { return !lockFiles.isEmpty(); } + /** + * Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo. + * Snapshot layer enforces thread safety by having checks in place to ensure that the source snapshot is not being deleted before proceeding + * with the clone operation. Hence, the original lock file would always be present while acquiring the lock for cloned snapshot. + * @param originalLockInfo lock info instance for original lock. + * @param clonedLockInfo lock info instance for which lock needs to be cloned. + * @throws IOException throws IOException if originalResource itself do not have any lock. + */ + @Override + public void cloneLock(LockInfo originalLockInfo, LockInfo clonedLockInfo) throws IOException { + assert originalLockInfo instanceof FileLockInfo : "originalLockInfo should be instance of FileLockInfo"; + assert clonedLockInfo instanceof FileLockInfo : "clonedLockInfo should be instance of FileLockInfo"; + String originalResourceId = Objects.requireNonNull(((FileLockInfo) originalLockInfo).getAcquirerId()); + String clonedResourceId = Objects.requireNonNull(((FileLockInfo) clonedLockInfo).getAcquirerId()); + assert originalResourceId != null && clonedResourceId != null : "provided resourceIds should not be null"; + String[] lockFiles = lockDirectory.listAll(); + String lockNameForAcquirer = ((FileLockInfo) originalLockInfo).getLockForAcquirer(lockFiles); + String fileToLockName = FileLockInfo.LockFileUtils.getFileToLockNameFromLock(lockNameForAcquirer); + acquire(FileLockInfo.getLockInfoBuilder().withFileToLock(fileToLockName).withAcquirerId(clonedResourceId).build()); + } + public void delete() throws IOException { lockDirectory.delete(); } diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index de46459601096..5c52a7c72d405 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -47,6 +47,7 @@ import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.store.Store; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; @@ -254,6 +255,18 @@ public void executeConsistentStateUpdate( in.executeConsistentStateUpdate(createUpdateTask, source, onFailure); } + @Override + public void cloneRemoteStoreIndexShardSnapshot( + SnapshotId source, + SnapshotId target, + RepositoryShardId shardId, + String shardGeneration, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, + ActionListener listener + ) { + in.cloneRemoteStoreIndexShardSnapshot(source, target, shardId, shardGeneration, remoteStoreLockManagerFactory, listener); + } + @Override public void cloneShardSnapshot( SnapshotId source, diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 5dba7e4403075..81e842217ad9a 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -48,6 +48,7 @@ import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.store.Store; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; @@ -389,6 +390,27 @@ void cloneShardSnapshot( ActionListener listener ); + /** + * Clones a remote store index shard snapshot. + * + * @param source source snapshot + * @param target target snapshot + * @param shardId shard id + * @param shardGeneration shard generation in repo + * @param remoteStoreLockManagerFactory remoteStoreLockManagerFactory for cloning metadata lock file + * @param listener listener to complete with new shard generation once clone has completed + */ + default void cloneRemoteStoreIndexShardSnapshot( + SnapshotId source, + SnapshotId target, + RepositoryShardId shardId, + @Nullable String shardGeneration, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, + ActionListener listener + ) { + throw new UnsupportedOperationException(); + } + /** * Hook that allows a repository to filter the user supplied snapshot metadata in {@link SnapshotsInProgress.Entry#userMetadata()} * during snapshot initialization. diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index edcab05c757f7..178aec4b47f47 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -114,6 +114,9 @@ import org.opensearch.index.snapshots.blobstore.SnapshotFiles; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.index.store.lockmanager.FileLockInfo; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; +import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.repositories.IndexId; @@ -510,8 +513,8 @@ public void cloneShardSnapshot( executor.execute(ActionRunnable.supply(listener, () -> { final long startTime = threadPool.absoluteTimeInMillis(); final BlobContainer shardContainer = shardContainer(index, shardNum); - final BlobStoreIndexShardSnapshots existingSnapshots; final String newGen; + final BlobStoreIndexShardSnapshots existingSnapshots; final String existingShardGen; if (shardGeneration == null) { Tuple tuple = buildBlobStoreIndexShardSnapshots( @@ -564,6 +567,9 @@ public void cloneShardSnapshot( + "]. A snapshot by that name already exists for this shard." ); } + // We don't need to check if there exists a shallow snapshot with the same name as we have the check before starting the clone + // operation ensuring that the snapshot name is available by checking the repository data. Also, the new clone snapshot would + // have a different UUID and hence a new unique snap-N file will be created. final BlobStoreIndexShardSnapshot sourceMeta = loadShardSnapshot(shardContainer, source); logger.trace("[{}] [{}] writing shard snapshot file for clone", shardId, target); INDEX_SHARD_SNAPSHOT_FORMAT.write( @@ -582,6 +588,50 @@ public void cloneShardSnapshot( })); } + @Override + public void cloneRemoteStoreIndexShardSnapshot( + SnapshotId source, + SnapshotId target, + RepositoryShardId shardId, + @Nullable String shardGeneration, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, + ActionListener listener + ) { + if (isReadOnly()) { + listener.onFailure(new RepositoryException(metadata.name(), "cannot clone shard snapshot on a readonly repository")); + return; + } + final IndexId index = shardId.index(); + final int shardNum = shardId.shardId(); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + executor.execute(ActionRunnable.supply(listener, () -> { + final long startTime = threadPool.relativeTimeInMillis(); + final BlobContainer shardContainer = shardContainer(index, shardNum); + // We don't need to check if there exists a shallow/full copy snapshot with the same name as we have the check before starting + // the clone operation ensuring that the snapshot name is available by checking the repository data. Also, the new clone shallow + // snapshot would have a different UUID and hence a new unique shallow-snap-N file will be created. + RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = loadShallowCopyShardSnapshot(shardContainer, source); + String indexUUID = remStoreBasedShardMetadata.getIndexUUID(); + String remoteStoreRepository = remStoreBasedShardMetadata.getRemoteStoreRepository(); + RemoteStoreMetadataLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager( + remoteStoreRepository, + indexUUID, + String.valueOf(shardId.shardId()) + ); + remoteStoreMetadataLockManger.cloneLock( + FileLockInfo.getLockInfoBuilder().withAcquirerId(source.getUUID()).build(), + FileLockInfo.getLockInfoBuilder().withAcquirerId(target.getUUID()).build() + ); + REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.write( + remStoreBasedShardMetadata.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), + shardContainer, + target.getUUID(), + compressor + ); + return shardGeneration; + })); + } + // Inspects all cluster state elements that contain a hint about what the current repository generation is and updates // #latestKnownRepoGen if a newer than currently known generation is found @Override diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java index e7d6eb5e9525e..d6810742341be 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java @@ -544,6 +544,7 @@ public Boolean includeGlobalState() { return includeGlobalState; } + @Nullable public Boolean isRemoteStoreIndexShallowCopyEnabled() { return remoteStoreIndexShallowCopy; } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 93bb1db39df73..091143d30a983 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -92,6 +92,7 @@ import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -171,6 +172,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final RepositoriesService repositoriesService; + private final RemoteStoreLockManagerFactory remoteStoreLockManagerFactory; + private final ThreadPool threadPool; private final Map>>> snapshotCompletionListeners = @@ -229,6 +232,7 @@ public SnapshotsService( this.clusterService = clusterService; this.indexNameExpressionResolver = indexNameExpressionResolver; this.repositoriesService = repositoriesService; + this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService); this.threadPool = transportService.getThreadPool(); this.transportService = transportService; @@ -800,7 +804,11 @@ public ClusterState execute(ClusterState currentState) { } } } - updatedEntry = cloneEntry.withClones(clonesBuilder); + updatedEntry = cloneEntry.withClones(clonesBuilder) + .withRemoteStoreIndexShallowCopy( + Boolean.TRUE.equals(snapshotInfoListener.result().isRemoteStoreIndexShallowCopyEnabled()) + ); + ; updatedEntries.set(i, updatedEntry); changed = true; break; @@ -828,7 +836,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS continue; } final RepositoryShardId repoShardId = indexClone.getKey(); - runReadyClone(target, sourceSnapshot, shardStatusBefore, repoShardId, repository); + final boolean remoteStoreIndexShallowCopy = Boolean.TRUE.equals(updatedEntry.remoteStoreIndexShallowCopy()); + runReadyClone(target, sourceSnapshot, shardStatusBefore, repoShardId, repository, remoteStoreIndexShallowCopy); } } else { // Extremely unlikely corner case of cluster-manager failing over between between starting the clone and @@ -846,60 +855,112 @@ private void runReadyClone( SnapshotId sourceSnapshot, ShardSnapshotStatus shardStatusBefore, RepositoryShardId repoShardId, - Repository repository + Repository repository, + boolean remoteStoreIndexShallowCopy ) { - final SnapshotId targetSnapshot = target.getSnapshotId(); - final String localNodeId = clusterService.localNode().getId(); - if (currentlyCloning.add(repoShardId)) { - repository.cloneShardSnapshot( - sourceSnapshot, - targetSnapshot, - repoShardId, - shardStatusBefore.generation(), - ActionListener.wrap( - generation -> innerUpdateSnapshotState( - new ShardSnapshotUpdate(target, repoShardId, new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)), - ActionListener.runBefore( - ActionListener.wrap( - v -> logger.trace( - "Marked [{}] as successfully cloned from [{}] to [{}]", + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + executor.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.warn( + "Failed to get repository data while cloning shard [{}] from [{}] to [{}]", + repoShardId, + sourceSnapshot, + target.getSnapshotId() + ); + failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId); + } + + @Override + protected void doRun() { + final String localNodeId = clusterService.localNode().getId(); + repository.getRepositoryData(ActionListener.wrap(repositoryData -> { + try { + final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData( + repositoryData, + sourceSnapshot, + repoShardId.index() + ); + final boolean cloneRemoteStoreIndexShardSnapshot = remoteStoreIndexShallowCopy + && indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); + final SnapshotId targetSnapshot = target.getSnapshotId(); + final ActionListener listener = ActionListener.wrap( + generation -> innerUpdateSnapshotState( + new ShardSnapshotUpdate( + target, repoShardId, - sourceSnapshot, - targetSnapshot + new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation) ), - e -> { - logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId); - failAllListenersOnMasterFailOver(e); - } + ActionListener.runBefore( + ActionListener.wrap( + v -> logger.trace( + "Marked [{}] as successfully cloned from [{}] to [{}]", + repoShardId, + sourceSnapshot, + targetSnapshot + ), + e -> { + logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId); + failAllListenersOnMasterFailOver(e); + } + ), + () -> currentlyCloning.remove(repoShardId) + ) ), - () -> currentlyCloning.remove(repoShardId) - ) - ), - e -> innerUpdateSnapshotState( - new ShardSnapshotUpdate( - target, - repoShardId, - new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null) - ), - ActionListener.runBefore( - ActionListener.wrap( - v -> logger.trace( - "Marked [{}] as failed clone from [{}] to [{}]", + e -> { + logger.warn("Exception [{}] while trying to clone shard [{}]", e, repoShardId); + failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId); + } + ); + if (currentlyCloning.add(repoShardId)) { + if (cloneRemoteStoreIndexShardSnapshot) { + repository.cloneRemoteStoreIndexShardSnapshot( + sourceSnapshot, + targetSnapshot, repoShardId, + shardStatusBefore.generation(), + remoteStoreLockManagerFactory, + listener + ); + } else { + repository.cloneShardSnapshot( sourceSnapshot, - targetSnapshot - ), - ex -> { - logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId); - failAllListenersOnMasterFailOver(ex); - } - ), - () -> currentlyCloning.remove(repoShardId) - ) - ) - ) - ); - } + targetSnapshot, + repoShardId, + shardStatusBefore.generation(), + listener + ); + } + } + } catch (IOException e) { + logger.warn("Failed to get index-metadata from repository data for index [{}]", repoShardId.index().getName()); + failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId); + } + }, this::onFailure)); + } + }); + } + + private void failCloneShardAndUpdateClusterState(Snapshot target, SnapshotId sourceSnapshot, RepositoryShardId repoShardId) { + // Stale blobs/lock-files will be cleaned up during delete/cleanup operation. + final String localNodeId = clusterService.localNode().getId(); + innerUpdateSnapshotState( + new ShardSnapshotUpdate( + target, + repoShardId, + new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null) + ), + ActionListener.runBefore( + ActionListener.wrap( + v -> logger.trace("Marked [{}] as failed clone from [{}] to [{}]", repoShardId, sourceSnapshot, target.getSnapshotId()), + ex -> { + logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId); + failAllListenersOnMasterFailOver(ex); + } + ), + () -> currentlyCloning.remove(repoShardId) + ) + ); } private void ensureBelowConcurrencyLimit( @@ -3652,12 +3713,14 @@ private void startExecutableClones(SnapshotsInProgress snapshotsInProgress, @Nul // this is a clone, see if new work is ready for (final Map.Entry clone : entry.clones().entrySet()) { if (clone.getValue().state() == ShardState.INIT) { + final boolean remoteStoreIndexShallowCopy = Boolean.TRUE.equals(entry.remoteStoreIndexShallowCopy()); runReadyClone( entry.snapshot(), entry.source(), clone.getValue(), clone.getKey(), - repositoriesService.repository(entry.repository()) + repositoriesService.repository(entry.repository()), + remoteStoreIndexShallowCopy ); } } diff --git a/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java b/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java index 95af53cb6e5ec..f3a2f1859923e 100644 --- a/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java +++ b/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java @@ -10,7 +10,7 @@ import org.opensearch.test.OpenSearchTestCase; -import java.util.List; +import java.nio.file.NoSuchFileException; public class FileLockInfoTests extends OpenSearchTestCase { String testMetadata = "testMetadata"; @@ -41,16 +41,13 @@ public void testGetLockPrefixFailureCase() { assertThrows(IllegalArgumentException.class, fileLockInfo::getLockPrefix); } - public void testGetLocksForAcquirer() { + public void testGetLocksForAcquirer() throws NoSuchFileException { String[] locks = new String[] { FileLockInfo.LockFileUtils.generateLockName(testMetadata, testAcquirerId), FileLockInfo.LockFileUtils.generateLockName(testMetadata, "acquirerId2") }; FileLockInfo fileLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build(); - assertEquals( - fileLockInfo.getLocksForAcquirer(locks), - List.of(FileLockInfo.LockFileUtils.generateLockName(testMetadata, testAcquirerId)) - ); + assertEquals(fileLockInfo.getLockForAcquirer(locks), FileLockInfo.LockFileUtils.generateLockName(testMetadata, testAcquirerId)); } } diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index ebd68fe5b544d..490418401e469 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -60,6 +60,7 @@ import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.store.Store; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository; @@ -383,6 +384,18 @@ public void cloneShardSnapshot( } + @Override + public void cloneRemoteStoreIndexShardSnapshot( + SnapshotId source, + SnapshotId target, + RepositoryShardId shardId, + String shardGeneration, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, + ActionListener listener + ) { + + } + @Override public Lifecycle.State lifecycleState() { return null; diff --git a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java index 5ed85fedc8cea..e1d8e26380698 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java @@ -44,6 +44,7 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.IndexMetaDataGenerations; import org.opensearch.repositories.Repository; @@ -209,4 +210,16 @@ public void cloneShardSnapshot( ) { throw new UnsupportedOperationException("Unsupported for restore-only repository"); } + + @Override + public void cloneRemoteStoreIndexShardSnapshot( + SnapshotId source, + SnapshotId target, + RepositoryShardId shardId, + String shardGeneration, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, + ActionListener listener + ) { + throw new UnsupportedOperationException("Unsupported for restore-only repository"); + } } diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index 7795d6126afc4..a0677dc75995f 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -51,6 +51,8 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Strings; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.compress.CompressorType; import org.opensearch.common.settings.Settings; @@ -61,6 +63,9 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.index.IndexModule; +import org.opensearch.index.store.RemoteBufferedOutputDirectory; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.NodeClosedException; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; @@ -512,6 +517,26 @@ protected void indexRandomDocs(String index, int numdocs) throws InterruptedExce assertDocCount(index, numdocs); } + protected Settings getRemoteStoreBackedIndexSettings(String remoteStoreRepo) { + return Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") + .put("index.refresh_interval", "300s") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey()) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, remoteStoreRepo) + .build(); + } + + protected Settings.Builder snapshotRepoSettingsForShallowCopy(Path path) { + final Settings.Builder settings = Settings.builder(); + settings.put("location", path); + settings.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE); + return settings; + } + protected long getCountForIndex(String indexName) { return client().search( new SearchRequest(new SearchRequest(indexName).source(new SearchSourceBuilder().size(0).trackTotalHits(true))) @@ -522,6 +547,21 @@ protected void assertDocCount(String index, long count) { assertEquals(getCountForIndex(index), count); } + protected String[] getLockFilesInRemoteStore(String remoteStoreIndex, String remoteStoreRepositoryName) throws IOException { + String indexUUID = client().admin() + .indices() + .prepareGetSettings(remoteStoreIndex) + .get() + .getSetting(remoteStoreIndex, IndexMetadata.SETTING_INDEX_UUID); + final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class); + final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(remoteStoreRepositoryName); + BlobPath shardLevelBlobPath = remoteStoreRepository.basePath().add(indexUUID).add("0").add("segments").add("lock_files"); + BlobContainer blobContainer = remoteStoreRepository.blobStore().blobContainer(shardLevelBlobPath); + try (RemoteBufferedOutputDirectory lockDirectory = new RemoteBufferedOutputDirectory(blobContainer)) { + return lockDirectory.listAll(); + } + } + /** * Adds a snapshot in state {@link SnapshotState#FAILED} to the given repository. *