diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java index 34ff64bc5333a..0febd3112fbb3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; @@ -41,6 +42,7 @@ import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryCleanupResult; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; @@ -167,97 +169,103 @@ private void cleanupRepo(String repositoryName, ActionListener repositoryDataListener = new StepListener<>(); + repository.getRepositoryData(repositoryDataListener); + repositoryDataListener.whenComplete(repositoryData -> { + final long repositoryStateId = repositoryData.getGenId(); + logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId); + clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']', + new ClusterStateUpdateTask() { - private boolean startedCleanup = false; + private boolean startedCleanup = false; - @Override - public ClusterState execute(ClusterState currentState) { - final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); - if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { - throw new IllegalStateException( - "Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in [" - + repositoryCleanupInProgress + "]"); - } - SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); - if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { - throw new IllegalStateException("Cannot cleanup [" + repositoryName - + "] - a snapshot is currently being deleted in [" + deletionsInProgress + "]"); - } - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots != null && !snapshots.entries().isEmpty()) { - throw new IllegalStateException( - "Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]"); + @Override + public ClusterState execute(ClusterState currentState) { + final RepositoryCleanupInProgress repositoryCleanupInProgress = + currentState.custom(RepositoryCleanupInProgress.TYPE); + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { + throw new IllegalStateException( + "Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in [" + + repositoryCleanupInProgress + "]"); + } + SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + throw new IllegalStateException("Cannot cleanup [" + repositoryName + + "] - a snapshot is currently being deleted in [" + deletionsInProgress + "]"); + } + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots != null && !snapshots.entries().isEmpty()) { + throw new IllegalStateException( + "Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]"); + } + return ClusterState.builder(currentState).putCustom(RepositoryCleanupInProgress.TYPE, + new RepositoryCleanupInProgress( + RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId))).build(); } - return ClusterState.builder(currentState).putCustom(RepositoryCleanupInProgress.TYPE, - new RepositoryCleanupInProgress( - RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId))).build(); - } - - @Override - public void onFailure(String source, Exception e) { - after(e, null); - } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - startedCleanup = true; - logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, - l -> blobStoreRepository.cleanup( - repositoryStateId, - newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION), - ActionListener.wrap(result -> after(null, result), e -> after(e, null))))); - } - - private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) { - if (failure == null) { - logger.debug("Finished repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId); - } else { - logger.debug(() -> new ParameterizedMessage( - "Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure); + @Override + public void onFailure(String source, Exception e) { + after(e, null); } - assert failure != null || result != null; - if (startedCleanup == false) { - logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure); - listener.onFailure(failure); - return; + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + startedCleanup = true; + logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, + l -> blobStoreRepository.cleanup( + repositoryStateId, + newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION), + ActionListener.wrap(result -> after(null, result), e -> after(e, null))))); } - clusterService.submitStateUpdateTask( - "remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']', - new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - return removeInProgressCleanup(currentState); - } - @Override - public void onFailure(String source, Exception e) { - if (failure != null) { - e.addSuppressed(failure); + private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) { + if (failure == null) { + logger.debug("Finished repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId); + } else { + logger.debug(() -> new ParameterizedMessage( + "Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure); + } + assert failure != null || result != null; + if (startedCleanup == false) { + logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure); + listener.onFailure(failure); + return; + } + clusterService.submitStateUpdateTask( + "remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']', + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return removeInProgressCleanup(currentState); + } + + @Override + public void onFailure(String source, Exception e) { + if (failure != null) { + e.addSuppressed(failure); + } + logger.warn(() -> + new ParameterizedMessage("[{}] failed to remove repository cleanup task", repositoryName), e); + listener.onFailure(e); } - logger.warn(() -> - new ParameterizedMessage("[{}] failed to remove repository cleanup task", repositoryName), e); - listener.onFailure(e); - } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (failure == null) { - logger.info("Done with repository cleanup on [{}][{}] with result [{}]", - repositoryName, repositoryStateId, result); - listener.onResponse(result); - } else { - logger.warn(() -> new ParameterizedMessage("Failed to run repository cleanup operations on [{}][{}]", - repositoryName, repositoryStateId), failure); - listener.onFailure(failure); + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (failure == null) { + logger.info("Done with repository cleanup on [{}][{}] with result [{}]", + repositoryName, repositoryStateId, result); + listener.onResponse(result); + } else { + logger.warn(() -> new ParameterizedMessage( + "Failed to run repository cleanup operations on [{}][{}]", + repositoryName, repositoryStateId), failure); + listener.onFailure(failure); + } } - } - }); - } - }); + }); + } + }); + }, listener::onFailure); } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index c0c4473012621..45e74c888fa7f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -483,15 +483,21 @@ private void restore(IndexShard indexShard, Repository repository, SnapshotRecov translogState.totalOperations(0); translogState.totalOperationsOnStart(0); indexShard.prepareForIndexRecovery(); - ShardId snapshotShardId = shardId; + final ShardId snapshotShardId; final String indexName = restoreSource.index(); if (!shardId.getIndexName().equals(indexName)) { snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id()); + } else { + snapshotShardId = shardId; } - final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName); - assert indexShard.getEngineOrNull() == null; - repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), indexId, snapshotShardId, - indexShard.recoveryState(), restoreListener); + repository.getRepositoryData(ActionListener.wrap( + repositoryData -> { + final IndexId indexId = repositoryData.resolveIndexId(indexName); + assert indexShard.getEngineOrNull() == null; + repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), indexId, snapshotShardId, + indexShard.recoveryState(), restoreListener); + }, restoreListener::onFailure + )); } catch (Exception e) { restoreListener.onFailure(e); } diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index b1c1973df9c37..f27277d58d9ab 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -68,8 +68,8 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind } @Override - public RepositoryData getRepositoryData() { - return in.getRepositoryData(); + public void getRepositoryData(ActionListener listener) { + in.getRepositoryData(listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index cdfda72fa94b3..4bdc446d03bf2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -105,7 +105,7 @@ default Repository create(RepositoryMetaData metaData, Function listener); /** * Starts snapshotting process diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 128fa1a199c9c..63f0b467748a1 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -116,6 +116,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -758,11 +759,13 @@ public void finalizeSnapshot(final SnapshotId snapshotId, // directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION // If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened // when writing the index-${N} to each shard directory. + final Consumer onUpdateFailure = + e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e)); final ActionListener allMetaListener = new GroupedActionListener<>( ActionListener.wrap(snapshotInfos -> { - assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos; - final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next(); - final RepositoryData existingRepositoryData = getRepositoryData(); + assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos; + final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next(); + getRepositoryData(ActionListener.wrap(existingRepositoryData -> { final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations); writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens); @@ -770,9 +773,8 @@ public void finalizeSnapshot(final SnapshotId snapshotId, cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); } listener.onResponse(snapshotInfo); - }, - e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e))), - 2 + indices.size()); + }, onUpdateFailure)); + }, onUpdateFailure), 2 + indices.size()); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will @@ -937,31 +939,33 @@ public void endVerification(String seed) { protected final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN); @Override - public RepositoryData getRepositoryData() { - // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository. - while (true) { - final long generation; - try { - generation = latestIndexBlobId(); - } catch (IOException ioe) { - throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe); - } - final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation)); - if (genToLoad > generation) { - logger.info("Determined repository generation [" + generation - + "] from repository contents but correct generation must be at least [" + genToLoad + "]"); - } - try { - return getRepositoryData(genToLoad); - } catch (RepositoryException e) { - if (genToLoad != latestKnownRepoGen.get()) { - logger.warn("Failed to load repository data generation [" + genToLoad + - "] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e); - continue; + public void getRepositoryData(ActionListener listener) { + ActionListener.completeWith(listener, () -> { + // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository. + while (true) { + final long generation; + try { + generation = latestIndexBlobId(); + } catch (IOException ioe) { + throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe); + } + final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation)); + if (genToLoad > generation) { + logger.info("Determined repository generation [" + generation + + "] from repository contents but correct generation must be at least [" + genToLoad + "]"); + } + try { + return getRepositoryData(genToLoad); + } catch (RepositoryException e) { + if (genToLoad != latestKnownRepoGen.get()) { + logger.warn("Failed to load repository data generation [" + genToLoad + + "] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e); + continue; + } + throw e; } - throw e; } - } + }); } private RepositoryData getRepositoryData(long indexGen) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 54ecb3e0fb3d0..9797c58b6f29a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -27,6 +27,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -162,7 +163,7 @@ public RestoreService(ClusterService clusterService, RepositoriesService reposit this.metaDataIndexUpgradeService = metaDataIndexUpgradeService; clusterService.addStateApplier(this); this.clusterSettings = clusterSettings; - this.cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor(logger); + this.cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor(); } /** @@ -176,366 +177,372 @@ public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionLi // Read snapshot info and metadata from the repository final String repositoryName = request.repository(); Repository repository = repositoriesService.repository(repositoryName); - final RepositoryData repositoryData = repository.getRepositoryData(); - final String snapshotName = request.snapshot(); - final Optional matchingSnapshotId = repositoryData.getSnapshotIds().stream() - .filter(s -> snapshotName.equals(s.getName())).findFirst(); - if (matchingSnapshotId.isPresent() == false) { - throw new SnapshotRestoreException(repositoryName, snapshotName, "snapshot does not exist"); - } + final StepListener repositoryDataListener = new StepListener<>(); + repository.getRepositoryData(repositoryDataListener); + repositoryDataListener.whenComplete(repositoryData -> { + final String snapshotName = request.snapshot(); + final Optional matchingSnapshotId = repositoryData.getSnapshotIds().stream() + .filter(s -> snapshotName.equals(s.getName())).findFirst(); + if (matchingSnapshotId.isPresent() == false) { + throw new SnapshotRestoreException(repositoryName, snapshotName, "snapshot does not exist"); + } - final SnapshotId snapshotId = matchingSnapshotId.get(); - final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); - final Snapshot snapshot = new Snapshot(repositoryName, snapshotId); + final SnapshotId snapshotId = matchingSnapshotId.get(); + final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); + final Snapshot snapshot = new Snapshot(repositoryName, snapshotId); - // Make sure that we can restore from this snapshot - validateSnapshotRestorable(repositoryName, snapshotInfo); + // Make sure that we can restore from this snapshot + validateSnapshotRestorable(repositoryName, snapshotInfo); - // Resolve the indices from the snapshot that need to be restored - final List indicesInSnapshot = filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions()); + // Resolve the indices from the snapshot that need to be restored + final List indicesInSnapshot = filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions()); - final MetaData.Builder metaDataBuilder; - if (request.includeGlobalState()) { - metaDataBuilder = MetaData.builder(repository.getSnapshotGlobalMetaData(snapshotId)); - } else { - metaDataBuilder = MetaData.builder(); - } + final MetaData.Builder metaDataBuilder; + if (request.includeGlobalState()) { + metaDataBuilder = MetaData.builder(repository.getSnapshotGlobalMetaData(snapshotId)); + } else { + metaDataBuilder = MetaData.builder(); + } - final List indexIdsInSnapshot = repositoryData.resolveIndices(indicesInSnapshot); - for (IndexId indexId : indexIdsInSnapshot) { - metaDataBuilder.put(repository.getSnapshotIndexMetaData(snapshotId, indexId), false); - } + final List indexIdsInSnapshot = repositoryData.resolveIndices(indicesInSnapshot); + for (IndexId indexId : indexIdsInSnapshot) { + metaDataBuilder.put(repository.getSnapshotIndexMetaData(snapshotId, indexId), false); + } - final MetaData metaData = metaDataBuilder.build(); - - // Apply renaming on index names, returning a map of names where - // the key is the renamed index and the value is the original name - final Map indices = renamedIndices(request, indicesInSnapshot); - - // Now we can start the actual restore process by adding shards to be recovered in the cluster state - // and updating cluster metadata (global and index) as needed - clusterService.submitStateUpdateTask("restore_snapshot[" + snapshotName + ']', new ClusterStateUpdateTask() { - String restoreUUID = UUIDs.randomBase64UUID(); - RestoreInfo restoreInfo = null; - - @Override - public ClusterState execute(ClusterState currentState) { - RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE); - if (currentState.getNodes().getMinNodeVersion().before(Version.V_7_0_0)) { - // Check if another restore process is already running - cannot run two restore processes at the - // same time in versions prior to 7.0 - if (restoreInProgress != null && restoreInProgress.isEmpty() == false) { - throw new ConcurrentSnapshotExecutionException(snapshot, "Restore process is already running in this cluster"); + final MetaData metaData = metaDataBuilder.build(); + + // Apply renaming on index names, returning a map of names where + // the key is the renamed index and the value is the original name + final Map indices = renamedIndices(request, indicesInSnapshot); + + // Now we can start the actual restore process by adding shards to be recovered in the cluster state + // and updating cluster metadata (global and index) as needed + clusterService.submitStateUpdateTask("restore_snapshot[" + snapshotName + ']', new ClusterStateUpdateTask() { + final String restoreUUID = UUIDs.randomBase64UUID(); + RestoreInfo restoreInfo = null; + + @Override + public ClusterState execute(ClusterState currentState) { + RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE); + if (currentState.getNodes().getMinNodeVersion().before(Version.V_7_0_0)) { + // Check if another restore process is already running - cannot run two restore processes at the + // same time in versions prior to 7.0 + if (restoreInProgress != null && restoreInProgress.isEmpty() == false) { + throw new ConcurrentSnapshotExecutionException(snapshot, + "Restore process is already running in this cluster"); + } + } + // Check if the snapshot to restore is currently being deleted + SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + throw new ConcurrentSnapshotExecutionException(snapshot, + "cannot restore a snapshot while a snapshot deletion is in-progress [" + + deletionsInProgress.getEntries().get(0).getSnapshot() + "]"); } - } - // Check if the snapshot to restore is currently being deleted - SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); - if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { - throw new ConcurrentSnapshotExecutionException(snapshot, - "cannot restore a snapshot while a snapshot deletion is in-progress [" + - deletionsInProgress.getEntries().get(0).getSnapshot() + "]"); - } - // Updating cluster state - ClusterState.Builder builder = ClusterState.builder(currentState); - MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); - ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); - RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable()); - ImmutableOpenMap shards; - Set aliases = new HashSet<>(); - - if (indices.isEmpty() == false) { - // We have some indices to restore - ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); - final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion() - .minimumIndexCompatibilityVersion(); - for (Map.Entry indexEntry : indices.entrySet()) { - String index = indexEntry.getValue(); - boolean partial = checkPartial(index); - SnapshotRecoverySource recoverySource = - new SnapshotRecoverySource(restoreUUID, snapshot, snapshotInfo.version(), index); - String renamedIndexName = indexEntry.getKey(); - IndexMetaData snapshotIndexMetaData = metaData.index(index); - snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData, - request.indexSettings(), request.ignoreIndexSettings()); - try { - snapshotIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(snapshotIndexMetaData, - minIndexCompatibilityVersion); - } catch (Exception ex) { - throw new SnapshotRestoreException(snapshot, "cannot restore index [" + index + "] because it cannot be " + - "upgraded", ex); - } - // Check that the index is closed or doesn't exist - IndexMetaData currentIndexMetaData = currentState.metaData().index(renamedIndexName); - IntSet ignoreShards = new IntHashSet(); - final Index renamedIndex; - if (currentIndexMetaData == null) { - // Index doesn't exist - create it and start recovery - // Make sure that the index we are about to create has a validate name - MetaDataCreateIndexService.validateIndexName(renamedIndexName, currentState); - createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings(), false); - IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData) - .state(IndexMetaData.State.OPEN) - .index(renamedIndexName); - indexMdBuilder.settings(Settings.builder() - .put(snapshotIndexMetaData.getSettings()) - .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())); - MetaDataCreateIndexService.checkShardLimit(snapshotIndexMetaData.getSettings(), currentState); - if (!request.includeAliases() && !snapshotIndexMetaData.getAliases().isEmpty()) { - // Remove all aliases - they shouldn't be restored - indexMdBuilder.removeAllAliases(); - } else { - for (ObjectCursor alias : snapshotIndexMetaData.getAliases().keys()) { - aliases.add(alias.value); - } - } - IndexMetaData updatedIndexMetaData = indexMdBuilder.build(); - if (partial) { - populateIgnoredShards(index, ignoreShards); + // Updating cluster state + ClusterState.Builder builder = ClusterState.builder(currentState); + MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); + ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable()); + ImmutableOpenMap shards; + Set aliases = new HashSet<>(); + + if (indices.isEmpty() == false) { + // We have some indices to restore + ImmutableOpenMap.Builder shardsBuilder = + ImmutableOpenMap.builder(); + final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion() + .minimumIndexCompatibilityVersion(); + for (Map.Entry indexEntry : indices.entrySet()) { + String index = indexEntry.getValue(); + boolean partial = checkPartial(index); + SnapshotRecoverySource recoverySource = + new SnapshotRecoverySource(restoreUUID, snapshot, snapshotInfo.version(), index); + String renamedIndexName = indexEntry.getKey(); + IndexMetaData snapshotIndexMetaData = metaData.index(index); + snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData, + request.indexSettings(), request.ignoreIndexSettings()); + try { + snapshotIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(snapshotIndexMetaData, + minIndexCompatibilityVersion); + } catch (Exception ex) { + throw new SnapshotRestoreException(snapshot, "cannot restore index [" + index + + "] because it cannot be upgraded", ex); } - rtBuilder.addAsNewRestore(updatedIndexMetaData, recoverySource, ignoreShards); - blocks.addBlocks(updatedIndexMetaData); - mdBuilder.put(updatedIndexMetaData, true); - renamedIndex = updatedIndexMetaData.getIndex(); - } else { - validateExistingIndex(currentIndexMetaData, snapshotIndexMetaData, renamedIndexName, partial); - // Index exists and it's closed - open it in metadata and start recovery - IndexMetaData.Builder indexMdBuilder = + // Check that the index is closed or doesn't exist + IndexMetaData currentIndexMetaData = currentState.metaData().index(renamedIndexName); + IntSet ignoreShards = new IntHashSet(); + final Index renamedIndex; + if (currentIndexMetaData == null) { + // Index doesn't exist - create it and start recovery + // Make sure that the index we are about to create has a validate name + MetaDataCreateIndexService.validateIndexName(renamedIndexName, currentState); + createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings(), false); + IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData) + .state(IndexMetaData.State.OPEN) + .index(renamedIndexName); + indexMdBuilder.settings(Settings.builder() + .put(snapshotIndexMetaData.getSettings()) + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())); + MetaDataCreateIndexService.checkShardLimit(snapshotIndexMetaData.getSettings(), currentState); + if (!request.includeAliases() && !snapshotIndexMetaData.getAliases().isEmpty()) { + // Remove all aliases - they shouldn't be restored + indexMdBuilder.removeAllAliases(); + } else { + for (ObjectCursor alias : snapshotIndexMetaData.getAliases().keys()) { + aliases.add(alias.value); + } + } + IndexMetaData updatedIndexMetaData = indexMdBuilder.build(); + if (partial) { + populateIgnoredShards(index, ignoreShards); + } + rtBuilder.addAsNewRestore(updatedIndexMetaData, recoverySource, ignoreShards); + blocks.addBlocks(updatedIndexMetaData); + mdBuilder.put(updatedIndexMetaData, true); + renamedIndex = updatedIndexMetaData.getIndex(); + } else { + validateExistingIndex(currentIndexMetaData, snapshotIndexMetaData, renamedIndexName, partial); + // Index exists and it's closed - open it in metadata and start recovery + IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData).state(IndexMetaData.State.OPEN); - indexMdBuilder.version( + indexMdBuilder.version( Math.max(snapshotIndexMetaData.getVersion(), 1 + currentIndexMetaData.getVersion())); - indexMdBuilder.mappingVersion( + indexMdBuilder.mappingVersion( Math.max(snapshotIndexMetaData.getMappingVersion(), 1 + currentIndexMetaData.getMappingVersion())); - indexMdBuilder.settingsVersion( + indexMdBuilder.settingsVersion( Math.max( - snapshotIndexMetaData.getSettingsVersion(), - 1 + currentIndexMetaData.getSettingsVersion())); - indexMdBuilder.aliasesVersion( + snapshotIndexMetaData.getSettingsVersion(), + 1 + currentIndexMetaData.getSettingsVersion())); + indexMdBuilder.aliasesVersion( Math.max(snapshotIndexMetaData.getAliasesVersion(), 1 + currentIndexMetaData.getAliasesVersion())); - for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) { - indexMdBuilder.primaryTerm(shard, - Math.max(snapshotIndexMetaData.primaryTerm(shard), currentIndexMetaData.primaryTerm(shard))); - } - - if (!request.includeAliases()) { - // Remove all snapshot aliases - if (!snapshotIndexMetaData.getAliases().isEmpty()) { - indexMdBuilder.removeAllAliases(); + for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) { + indexMdBuilder.primaryTerm(shard, + Math.max(snapshotIndexMetaData.primaryTerm(shard), currentIndexMetaData.primaryTerm(shard))); } - /// Add existing aliases - for (ObjectCursor alias : currentIndexMetaData.getAliases().values()) { - indexMdBuilder.putAlias(alias.value); - } - } else { - for (ObjectCursor alias : snapshotIndexMetaData.getAliases().keys()) { - aliases.add(alias.value); + + if (!request.includeAliases()) { + // Remove all snapshot aliases + if (!snapshotIndexMetaData.getAliases().isEmpty()) { + indexMdBuilder.removeAllAliases(); + } + /// Add existing aliases + for (ObjectCursor alias : currentIndexMetaData.getAliases().values()) { + indexMdBuilder.putAlias(alias.value); + } + } else { + for (ObjectCursor alias : snapshotIndexMetaData.getAliases().keys()) { + aliases.add(alias.value); + } } + indexMdBuilder.settings(Settings.builder() + .put(snapshotIndexMetaData.getSettings()) + .put(IndexMetaData.SETTING_INDEX_UUID, + currentIndexMetaData.getIndexUUID())); + IndexMetaData updatedIndexMetaData = indexMdBuilder.index(renamedIndexName).build(); + rtBuilder.addAsRestore(updatedIndexMetaData, recoverySource); + blocks.updateBlocks(updatedIndexMetaData); + mdBuilder.put(updatedIndexMetaData, true); + renamedIndex = updatedIndexMetaData.getIndex(); } - indexMdBuilder.settings(Settings.builder() - .put(snapshotIndexMetaData.getSettings()) - .put(IndexMetaData.SETTING_INDEX_UUID, - currentIndexMetaData.getIndexUUID())); - IndexMetaData updatedIndexMetaData = indexMdBuilder.index(renamedIndexName).build(); - rtBuilder.addAsRestore(updatedIndexMetaData, recoverySource); - blocks.updateBlocks(updatedIndexMetaData); - mdBuilder.put(updatedIndexMetaData, true); - renamedIndex = updatedIndexMetaData.getIndex(); - } - for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) { - if (!ignoreShards.contains(shard)) { - shardsBuilder.put(new ShardId(renamedIndex, shard), + for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) { + if (!ignoreShards.contains(shard)) { + shardsBuilder.put(new ShardId(renamedIndex, shard), new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId())); - } else { - shardsBuilder.put(new ShardId(renamedIndex, shard), + } else { + shardsBuilder.put(new ShardId(renamedIndex, shard), new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(), RestoreInProgress.State.FAILURE)); + } } } - } - shards = shardsBuilder.build(); - RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry( - restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards), - Collections.unmodifiableList(new ArrayList<>(indices.keySet())), - shards - ); - RestoreInProgress.Builder restoreInProgressBuilder; - if (restoreInProgress != null) { - restoreInProgressBuilder = new RestoreInProgress.Builder(restoreInProgress); + shards = shardsBuilder.build(); + RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry( + restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards), + Collections.unmodifiableList(new ArrayList<>(indices.keySet())), + shards + ); + RestoreInProgress.Builder restoreInProgressBuilder; + if (restoreInProgress != null) { + restoreInProgressBuilder = new RestoreInProgress.Builder(restoreInProgress); + } else { + restoreInProgressBuilder = new RestoreInProgress.Builder(); + } + builder.putCustom(RestoreInProgress.TYPE, restoreInProgressBuilder.add(restoreEntry).build()); } else { - restoreInProgressBuilder = new RestoreInProgress.Builder(); + shards = ImmutableOpenMap.of(); } - builder.putCustom(RestoreInProgress.TYPE, restoreInProgressBuilder.add(restoreEntry).build()); - } else { - shards = ImmutableOpenMap.of(); - } - checkAliasNameConflicts(indices, aliases); + checkAliasNameConflicts(indices, aliases); - // Restore global state if needed - if (request.includeGlobalState()) { - if (metaData.persistentSettings() != null) { - Settings settings = metaData.persistentSettings(); - clusterSettings.validateUpdate(settings); - mdBuilder.persistentSettings(settings); - } - if (metaData.templates() != null) { - // TODO: Should all existing templates be deleted first? - for (ObjectCursor cursor : metaData.templates().values()) { - mdBuilder.put(cursor.value); + // Restore global state if needed + if (request.includeGlobalState()) { + if (metaData.persistentSettings() != null) { + Settings settings = metaData.persistentSettings(); + clusterSettings.validateUpdate(settings); + mdBuilder.persistentSettings(settings); } - } - if (metaData.customs() != null) { - for (ObjectObjectCursor cursor : metaData.customs()) { - if (!RepositoriesMetaData.TYPE.equals(cursor.key)) { - // Don't restore repositories while we are working with them - // TODO: Should we restore them at the end? - mdBuilder.putCustom(cursor.key, cursor.value); + if (metaData.templates() != null) { + // TODO: Should all existing templates be deleted first? + for (ObjectCursor cursor : metaData.templates().values()) { + mdBuilder.put(cursor.value); + } + } + if (metaData.customs() != null) { + for (ObjectObjectCursor cursor : metaData.customs()) { + if (!RepositoriesMetaData.TYPE.equals(cursor.key)) { + // Don't restore repositories while we are working with them + // TODO: Should we restore them at the end? + mdBuilder.putCustom(cursor.key, cursor.value); + } } } } - } - if (completed(shards)) { - // We don't have any indices to restore - we are done - restoreInfo = new RestoreInfo(snapshotId.getName(), - Collections.unmodifiableList(new ArrayList<>(indices.keySet())), - shards.size(), - shards.size() - failedShards(shards)); - } + if (completed(shards)) { + // We don't have any indices to restore - we are done + restoreInfo = new RestoreInfo(snapshotId.getName(), + Collections.unmodifiableList(new ArrayList<>(indices.keySet())), + shards.size(), + shards.size() - failedShards(shards)); + } - RoutingTable rt = rtBuilder.build(); - ClusterState updatedState = builder.metaData(mdBuilder).blocks(blocks).routingTable(rt).build(); - return allocationService.reroute(updatedState, "restored snapshot [" + snapshot + "]"); - } + RoutingTable rt = rtBuilder.build(); + ClusterState updatedState = builder.metaData(mdBuilder).blocks(blocks).routingTable(rt).build(); + return allocationService.reroute(updatedState, "restored snapshot [" + snapshot + "]"); + } - private void checkAliasNameConflicts(Map renamedIndices, Set aliases) { - for (Map.Entry renamedIndex : renamedIndices.entrySet()) { - if (aliases.contains(renamedIndex.getKey())) { - throw new SnapshotRestoreException(snapshot, - "cannot rename index [" + renamedIndex.getValue() + "] into [" + renamedIndex.getKey() + "] because of " + - "conflict with an alias with the same name"); + private void checkAliasNameConflicts(Map renamedIndices, Set aliases) { + for (Map.Entry renamedIndex : renamedIndices.entrySet()) { + if (aliases.contains(renamedIndex.getKey())) { + throw new SnapshotRestoreException(snapshot, + "cannot rename index [" + renamedIndex.getValue() + "] into [" + renamedIndex.getKey() + + "] because of conflict with an alias with the same name"); + } } } - } - private void populateIgnoredShards(String index, IntSet ignoreShards) { - for (SnapshotShardFailure failure : snapshotInfo.shardFailures()) { - if (index.equals(failure.index())) { - ignoreShards.add(failure.shardId()); + private void populateIgnoredShards(String index, IntSet ignoreShards) { + for (SnapshotShardFailure failure : snapshotInfo.shardFailures()) { + if (index.equals(failure.index())) { + ignoreShards.add(failure.shardId()); + } } } - } - private boolean checkPartial(String index) { - // Make sure that index was fully snapshotted - if (failed(snapshotInfo, index)) { - if (request.partial()) { - return true; + private boolean checkPartial(String index) { + // Make sure that index was fully snapshotted + if (failed(snapshotInfo, index)) { + if (request.partial()) { + return true; + } else { + throw new SnapshotRestoreException(snapshot, "index [" + index + "] wasn't fully snapshotted - cannot " + + "restore"); + } } else { - throw new SnapshotRestoreException(snapshot, "index [" + index + "] wasn't fully snapshotted - cannot " + - "restore"); + return false; } - } else { - return false; } - } - private void validateExistingIndex(IndexMetaData currentIndexMetaData, IndexMetaData snapshotIndexMetaData, - String renamedIndex, boolean partial) { - // Index exist - checking that it's closed - if (currentIndexMetaData.getState() != IndexMetaData.State.CLOSE) { - // TODO: Enable restore for open indices - throw new SnapshotRestoreException(snapshot, "cannot restore index [" + renamedIndex + "] because an open index " + - "with same name already exists in the cluster. Either close or delete the existing index or restore the " + - "index under a different name by providing a rename pattern and replacement name"); - } - // Index exist - checking if it's partial restore - if (partial) { - throw new SnapshotRestoreException(snapshot, "cannot restore partial index [" + renamedIndex + "] because such " + - "index already exists"); - } - // Make sure that the number of shards is the same. That's the only thing that we cannot change - if (currentIndexMetaData.getNumberOfShards() != snapshotIndexMetaData.getNumberOfShards()) { - throw new SnapshotRestoreException(snapshot, - "cannot restore index [" + renamedIndex + "] with [" + currentIndexMetaData.getNumberOfShards() + "] shards " + - "from a snapshot of index [" + snapshotIndexMetaData.getIndex().getName() + "] with [" + - snapshotIndexMetaData.getNumberOfShards() + "] shards"); + private void validateExistingIndex(IndexMetaData currentIndexMetaData, IndexMetaData snapshotIndexMetaData, + String renamedIndex, boolean partial) { + // Index exist - checking that it's closed + if (currentIndexMetaData.getState() != IndexMetaData.State.CLOSE) { + // TODO: Enable restore for open indices + throw new SnapshotRestoreException(snapshot, "cannot restore index [" + renamedIndex + + "] because an open index " + + "with same name already exists in the cluster. Either close or delete the existing index or restore the " + + "index under a different name by providing a rename pattern and replacement name"); + } + // Index exist - checking if it's partial restore + if (partial) { + throw new SnapshotRestoreException(snapshot, "cannot restore partial index [" + renamedIndex + + "] because such index already exists"); + } + // Make sure that the number of shards is the same. That's the only thing that we cannot change + if (currentIndexMetaData.getNumberOfShards() != snapshotIndexMetaData.getNumberOfShards()) { + throw new SnapshotRestoreException(snapshot, + "cannot restore index [" + renamedIndex + "] with [" + currentIndexMetaData.getNumberOfShards() + + "] shards from a snapshot of index [" + snapshotIndexMetaData.getIndex().getName() + "] with [" + + snapshotIndexMetaData.getNumberOfShards() + "] shards"); + } } - } - /** - * Optionally updates index settings in indexMetaData by removing settings listed in ignoreSettings and - * merging them with settings in changeSettings. - */ - private IndexMetaData updateIndexSettings(IndexMetaData indexMetaData, Settings changeSettings, String[] ignoreSettings) { - Settings normalizedChangeSettings = Settings.builder() - .put(changeSettings) - .normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX) - .build(); - IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData); - Settings settings = indexMetaData.getSettings(); - Set keyFilters = new HashSet<>(); - List simpleMatchPatterns = new ArrayList<>(); - for (String ignoredSetting : ignoreSettings) { - if (!Regex.isSimpleMatchPattern(ignoredSetting)) { - if (UNREMOVABLE_SETTINGS.contains(ignoredSetting)) { - throw new SnapshotRestoreException(snapshot, "cannot remove setting [" + ignoredSetting + "] on restore"); + /** + * Optionally updates index settings in indexMetaData by removing settings listed in ignoreSettings and + * merging them with settings in changeSettings. + */ + private IndexMetaData updateIndexSettings(IndexMetaData indexMetaData, Settings changeSettings, + String[] ignoreSettings) { + Settings normalizedChangeSettings = Settings.builder() + .put(changeSettings) + .normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX) + .build(); + IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData); + Settings settings = indexMetaData.getSettings(); + Set keyFilters = new HashSet<>(); + List simpleMatchPatterns = new ArrayList<>(); + for (String ignoredSetting : ignoreSettings) { + if (!Regex.isSimpleMatchPattern(ignoredSetting)) { + if (UNREMOVABLE_SETTINGS.contains(ignoredSetting)) { + throw new SnapshotRestoreException( + snapshot, "cannot remove setting [" + ignoredSetting + "] on restore"); + } else { + keyFilters.add(ignoredSetting); + } } else { - keyFilters.add(ignoredSetting); + simpleMatchPatterns.add(ignoredSetting); } - } else { - simpleMatchPatterns.add(ignoredSetting); } - } - Predicate settingsFilter = k -> { - if (UNREMOVABLE_SETTINGS.contains(k) == false) { - for (String filterKey : keyFilters) { - if (k.equals(filterKey)) { - return false; + Predicate settingsFilter = k -> { + if (UNREMOVABLE_SETTINGS.contains(k) == false) { + for (String filterKey : keyFilters) { + if (k.equals(filterKey)) { + return false; + } } - } - for (String pattern : simpleMatchPatterns) { - if (Regex.simpleMatch(pattern, k)) { - return false; + for (String pattern : simpleMatchPatterns) { + if (Regex.simpleMatch(pattern, k)) { + return false; + } } } - } - return true; - }; - Settings.Builder settingsBuilder = Settings.builder() - .put(settings.filter(settingsFilter)) - .put(normalizedChangeSettings.filter(k -> { - if (UNMODIFIABLE_SETTINGS.contains(k)) { - throw new SnapshotRestoreException(snapshot, "cannot modify setting [" + k + "] on restore"); - } else { - return true; - } - })); - settingsBuilder.remove(MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey()); - return builder.settings(settingsBuilder).build(); - } - - @Override - public void onFailure(String source, Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", snapshotId), e); - listener.onFailure(e); - } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } + return true; + }; + Settings.Builder settingsBuilder = Settings.builder() + .put(settings.filter(settingsFilter)) + .put(normalizedChangeSettings.filter(k -> { + if (UNMODIFIABLE_SETTINGS.contains(k)) { + throw new SnapshotRestoreException(snapshot, "cannot modify setting [" + k + "] on restore"); + } else { + return true; + } + })); + settingsBuilder.remove(MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey()); + return builder.settings(settingsBuilder).build(); + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new RestoreCompletionResponse(restoreUUID, snapshot, restoreInfo)); - } - }); + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", snapshotId), e); + listener.onFailure(e); + } + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new RestoreCompletionResponse(restoreUUID, snapshot, restoreInfo)); + } + }); + }, listener::onFailure); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", request.repository() + ":" + request.snapshot()), e); @@ -599,7 +606,8 @@ public RestoreInfo getRestoreInfo() { } public static class RestoreInProgressUpdater extends RoutingChangesObserver.AbstractRoutingChangesObserver { - private final Map shardChanges = new HashMap<>(); + // Map of RestoreUUID to a of changes to the shards' restore statuses + private final Map> shardChanges = new HashMap<>(); @Override public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) { @@ -607,7 +615,7 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha if (initializingShard.primary()) { RecoverySource recoverySource = initializingShard.recoverySource(); if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) { - changes(recoverySource).shards.put( + changes(recoverySource).put( initializingShard.shardId(), new ShardRestoreStatus(initializingShard.currentNodeId(), RestoreInProgress.State.SUCCESS)); } @@ -623,7 +631,7 @@ public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) // to restore this shard on another node if the snapshot files are corrupt. In case where a node just left or crashed, // however, we only want to acknowledge the restore operation once it has been successfully restored on another node. if (unassignedInfo.getFailure() != null && Lucene.isCorruptionException(unassignedInfo.getFailure().getCause())) { - changes(recoverySource).shards.put( + changes(recoverySource).put( failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(), RestoreInProgress.State.FAILURE, unassignedInfo.getFailure().getCause().getMessage())); } @@ -636,7 +644,7 @@ public void shardInitialized(ShardRouting unassignedShard, ShardRouting initiali // if we force an empty primary, we should also fail the restore entry if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT && initializedShard.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) { - changes(unassignedShard.recoverySource()).shards.put( + changes(unassignedShard.recoverySource()).put( unassignedShard.shardId(), new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE, "recovery source type changed from snapshot to " + initializedShard.recoverySource()) @@ -650,7 +658,7 @@ public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo n if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) { if (newUnassignedInfo.getLastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_NO) { String reason = "shard could not be allocated to any of the nodes"; - changes(recoverySource).shards.put( + changes(recoverySource).put( unassignedShard.shardId(), new ShardRestoreStatus(unassignedShard.currentNodeId(), RestoreInProgress.State.FAILURE, reason)); } @@ -661,24 +669,20 @@ public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo n * Helper method that creates update entry for the given recovery source's restore uuid * if such an entry does not exist yet. */ - private Updates changes(RecoverySource recoverySource) { + private Map changes(RecoverySource recoverySource) { assert recoverySource.getType() == RecoverySource.Type.SNAPSHOT; - return shardChanges.computeIfAbsent(((SnapshotRecoverySource) recoverySource).restoreUUID(), k -> new Updates()); - } - - private static class Updates { - private Map shards = new HashMap<>(); + return shardChanges.computeIfAbsent(((SnapshotRecoverySource) recoverySource).restoreUUID(), k -> new HashMap<>()); } public RestoreInProgress applyChanges(final RestoreInProgress oldRestore) { if (shardChanges.isEmpty() == false) { RestoreInProgress.Builder builder = new RestoreInProgress.Builder(); for (RestoreInProgress.Entry entry : oldRestore) { - Updates updates = shardChanges.get(entry.uuid()); + Map updates = shardChanges.get(entry.uuid()); ImmutableOpenMap shardStates = entry.shards(); - if (updates != null && updates.shards.isEmpty() == false) { + if (updates != null && updates.isEmpty() == false) { ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(shardStates); - for (Map.Entry shard : updates.shards.entrySet()) { + for (Map.Entry shard : updates.entrySet()) { ShardId shardId = shard.getKey(); ShardRestoreStatus status = shardStates.get(shardId); if (status == null || status.state().completed() == false) { @@ -724,14 +728,8 @@ public String toString() { } } - private final Logger logger; - - CleanRestoreStateTaskExecutor(Logger logger) { - this.logger = logger; - } - @Override - public ClusterTasksResult execute(final ClusterState currentState, final List tasks) throws Exception { + public ClusterTasksResult execute(final ClusterState currentState, final List tasks) { final ClusterTasksResult.Builder resultBuilder = ClusterTasksResult.builder().successes(tasks); Set completedRestores = tasks.stream().map(e -> e.uuid).collect(Collectors.toSet()); RestoreInProgress.Builder restoreInProgressBuilder = new RestoreInProgress.Builder(); @@ -786,8 +784,8 @@ private void cleanupRestoreState(ClusterChangedEvent event) { } } - public static RestoreInProgress.State overallState(RestoreInProgress.State nonCompletedState, - ImmutableOpenMap shards) { + private static RestoreInProgress.State overallState(RestoreInProgress.State nonCompletedState, + ImmutableOpenMap shards) { boolean hasFailed = false; for (ObjectCursor status : shards.values()) { if (!status.value.state().completed()) { @@ -823,7 +821,7 @@ public static int failedShards(ImmutableOpenMap renamedIndices(RestoreSnapshotRequest request, List filteredIndices) { + private static Map renamedIndices(RestoreSnapshotRequest request, List filteredIndices) { Map renamedIndices = new HashMap<>(); for (String index : filteredIndices) { String renamedIndex = index; @@ -845,7 +843,7 @@ private Map renamedIndices(RestoreSnapshotRequest request, List< * @param repository repository name * @param snapshotInfo snapshot metadata */ - private void validateSnapshotRestorable(final String repository, final SnapshotInfo snapshotInfo) { + private static void validateSnapshotRestorable(final String repository, final SnapshotInfo snapshotInfo) { if (!snapshotInfo.state().restorable()) { throw new SnapshotRestoreException(new Snapshot(repository, snapshotInfo.snapshotId()), "unsupported snapshot state [" + snapshotInfo.state() + "]"); @@ -857,7 +855,7 @@ private void validateSnapshotRestorable(final String repository, final SnapshotI } } - private boolean failed(SnapshotInfo snapshot, String index) { + private static boolean failed(SnapshotInfo snapshot, String index) { for (SnapshotShardFailure failure : snapshot.shardFailures()) { if (index.equals(failure.index())) { return true; diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 77af75cb4553a..e4f7580f09b5c 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -29,7 +29,9 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -166,7 +168,7 @@ public SnapshotsService(Settings settings, ClusterService clusterService, IndexN public RepositoryData getRepositoryData(final String repositoryName) { Repository repository = repositoriesService.repository(repositoryName); assert repository != null; // should only be called once we've validated the repository exists - return repository.getRepositoryData(); + return PlainActionFuture.get(repository::getRepositoryData); } /** @@ -266,86 +268,88 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); validate(repositoryName, snapshotName); final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot - final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData(); - - clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() { - - private SnapshotsInProgress.Entry newSnapshot = null; - - @Override - public ClusterState execute(ClusterState currentState) { - validate(repositoryName, snapshotName, currentState); - SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); - if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, - "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]"); - } - final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); - if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, - "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"); - } - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots == null || snapshots.entries().isEmpty()) { - // Store newSnapshot here to be processed in clusterStateProcessed - List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, - request.indicesOptions(), request.indices())); - logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); - List snapshotIndices = repositoryData.resolveNewIndices(indices); - newSnapshot = new SnapshotsInProgress.Entry( - new Snapshot(repositoryName, snapshotId), - request.includeGlobalState(), request.partial(), - State.INIT, - snapshotIndices, - threadPool.absoluteTimeInMillis(), - repositoryData.getGenId(), - null, - request.userMetadata(), - clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION)); - initializingSnapshots.add(newSnapshot.snapshot()); - snapshots = new SnapshotsInProgress(newSnapshot); - } else { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); + final StepListener repositoryDataListener = new StepListener<>(); + repositoriesService.repository(repositoryName).getRepositoryData(repositoryDataListener); + repositoryDataListener.whenComplete(repositoryData -> { + clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() { + + private SnapshotsInProgress.Entry newSnapshot = null; + + @Override + public ClusterState execute(ClusterState currentState) { + validate(repositoryName, snapshotName, currentState); + SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, + "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]"); + } + final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, + "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"); + } + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots == null || snapshots.entries().isEmpty()) { + // Store newSnapshot here to be processed in clusterStateProcessed + List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, + request.indicesOptions(), request.indices())); + logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); + List snapshotIndices = repositoryData.resolveNewIndices(indices); + newSnapshot = new SnapshotsInProgress.Entry( + new Snapshot(repositoryName, snapshotId), + request.includeGlobalState(), request.partial(), + State.INIT, + snapshotIndices, + threadPool.absoluteTimeInMillis(), + repositoryData.getGenId(), + null, + request.userMetadata(), + clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION)); + initializingSnapshots.add(newSnapshot.snapshot()); + snapshots = new SnapshotsInProgress(newSnapshot); + } else { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); + } + return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); } - return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); - } - @Override - public void onFailure(String source, Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); - if (newSnapshot != null) { - initializingSnapshots.remove(newSnapshot.snapshot()); + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); + if (newSnapshot != null) { + initializingSnapshots.remove(newSnapshot.snapshot()); + } + newSnapshot = null; + listener.onFailure(e); } - newSnapshot = null; - listener.onFailure(e); - } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { - if (newSnapshot != null) { - final Snapshot current = newSnapshot.snapshot(); - assert initializingSnapshots.contains(current); - beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener() { - @Override - public void onResponse(final Snapshot snapshot) { - initializingSnapshots.remove(snapshot); - listener.onResponse(snapshot); - } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { + if (newSnapshot != null) { + final Snapshot current = newSnapshot.snapshot(); + assert initializingSnapshots.contains(current); + beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener() { + @Override + public void onResponse(final Snapshot snapshot) { + initializingSnapshots.remove(snapshot); + listener.onResponse(snapshot); + } - @Override - public void onFailure(final Exception e) { - initializingSnapshots.remove(current); - listener.onFailure(e); - } - }); + @Override + public void onFailure(final Exception e) { + initializingSnapshots.remove(current); + listener.onFailure(e); + } + }); + } } - } - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - }); + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + }); + }, listener::onFailure); } /** @@ -418,111 +422,116 @@ protected void doRun() { throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"); } final String snapshotName = snapshot.snapshot().getSnapshotId().getName(); - final RepositoryData repositoryData = repository.getRepositoryData(); - // check if the snapshot name already exists in the repository - if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { - throw new InvalidSnapshotNameException( - repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists"); - } - if (clusterState.nodes().getMinNodeVersion().onOrAfter(NO_REPO_INITIALIZE_VERSION) == false) { - // In mixed version clusters we initialize the snapshot in the repository so that in case of a master failover to an - // older version master node snapshot finalization (that assumes initializeSnapshot was called) produces a valid - // snapshot. - repository.initializeSnapshot( - snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaDataForSnapshot(snapshot, clusterState.metaData())); - } - snapshotCreated = true; - - logger.info("snapshot [{}] started", snapshot.snapshot()); - if (snapshot.indices().isEmpty()) { - // No indices in this snapshot - we are done - userCreateSnapshotListener.onResponse(snapshot.snapshot()); - endSnapshot(snapshot, clusterState.metaData()); - return; - } - clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { - - @Override - public ClusterState execute(ClusterState currentState) { - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - List entries = new ArrayList<>(); - for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - if (entry.snapshot().equals(snapshot.snapshot()) == false) { - entries.add(entry); - continue; - } + final StepListener repositoryDataListener = new StepListener<>(); + repository.getRepositoryData(repositoryDataListener); + repositoryDataListener.whenComplete(repositoryData -> { + // check if the snapshot name already exists in the repository + if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { + throw new InvalidSnapshotNameException( + repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists"); + } + if (clusterState.nodes().getMinNodeVersion().onOrAfter(NO_REPO_INITIALIZE_VERSION) == false) { + // In mixed version clusters we initialize the snapshot in the repository so that in case of a master failover to an + // older version master node snapshot finalization (that assumes initializeSnapshot was called) produces a valid + // snapshot. + repository.initializeSnapshot( + snapshot.snapshot().getSnapshotId(), snapshot.indices(), + metaDataForSnapshot(snapshot, clusterState.metaData())); + } + snapshotCreated = true; - if (entry.state() == State.ABORTED) { - entries.add(entry); - assert entry.shards().isEmpty(); - hadAbortedInitializations = true; - } else { - // Replace the snapshot that was just initialized - ImmutableOpenMap shards = shards(currentState, entry, repositoryData); - if (!partial) { - Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, - currentState.metaData()); - Set missing = indicesWithMissingShards.v1(); - Set closed = indicesWithMissingShards.v2(); - if (missing.isEmpty() == false || closed.isEmpty() == false) { - final StringBuilder failureMessage = new StringBuilder(); - if (missing.isEmpty() == false) { - failureMessage.append("Indices don't have primary shards "); - failureMessage.append(missing); - } - if (closed.isEmpty() == false) { - if (failureMessage.length() > 0) { - failureMessage.append("; "); + logger.info("snapshot [{}] started", snapshot.snapshot()); + if (snapshot.indices().isEmpty()) { + // No indices in this snapshot - we are done + userCreateSnapshotListener.onResponse(snapshot.snapshot()); + endSnapshot(snapshot, clusterState.metaData()); + return; + } + clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) { + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + List entries = new ArrayList<>(); + for (SnapshotsInProgress.Entry entry : snapshots.entries()) { + if (entry.snapshot().equals(snapshot.snapshot()) == false) { + entries.add(entry); + continue; + } + + if (entry.state() == State.ABORTED) { + entries.add(entry); + assert entry.shards().isEmpty(); + hadAbortedInitializations = true; + } else { + // Replace the snapshot that was just initialized + ImmutableOpenMap shards = shards(currentState, entry, repositoryData); + if (!partial) { + Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, + currentState.metaData()); + Set missing = indicesWithMissingShards.v1(); + Set closed = indicesWithMissingShards.v2(); + if (missing.isEmpty() == false || closed.isEmpty() == false) { + final StringBuilder failureMessage = new StringBuilder(); + if (missing.isEmpty() == false) { + failureMessage.append("Indices don't have primary shards "); + failureMessage.append(missing); } - failureMessage.append("Indices are closed "); - failureMessage.append(closed); + if (closed.isEmpty() == false) { + if (failureMessage.length() > 0) { + failureMessage.append("; "); + } + failureMessage.append("Indices are closed "); + failureMessage.append(closed); + } + entries.add( + new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString())); + continue; } - entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString())); - continue; } + entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards)); } - entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards)); } + return ClusterState.builder(currentState) + .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))) + .build(); } - return ClusterState.builder(currentState) - .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))) - .build(); - } - - @Override - public void onFailure(String source, Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot", - snapshot.snapshot().getSnapshotId()), e); - removeSnapshotFromClusterState(snapshot.snapshot(), null, e, - new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e)); - } - @Override - public void onNoLongerMaster(String source) { - // We are not longer a master - we shouldn't try to do any cleanup - // The new master will take care of it - logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId()); - userCreateSnapshotListener.onFailure( - new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization")); - } + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot", + snapshot.snapshot().getSnapshotId()), e); + removeSnapshotFromClusterState(snapshot.snapshot(), null, e, + new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e)); + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - // The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted - // for processing. If client wants to wait for the snapshot completion, it can register snapshot - // completion listener in this method. For the snapshot completion to work properly, the snapshot - // should still exist when listener is registered. - userCreateSnapshotListener.onResponse(snapshot.snapshot()); + @Override + public void onNoLongerMaster(String source) { + // We are not longer a master - we shouldn't try to do any cleanup + // The new master will take care of it + logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId()); + userCreateSnapshotListener.onFailure( + new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization")); + } - if (hadAbortedInitializations) { - final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE); - assert snapshotsInProgress != null; - final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot()); - assert entry != null; - endSnapshot(entry, newState.metaData()); + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + // The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted + // for processing. If client wants to wait for the snapshot completion, it can register snapshot + // completion listener in this method. For the snapshot completion to work properly, the snapshot + // should still exist when listener is registered. + userCreateSnapshotListener.onResponse(snapshot.snapshot()); + + if (hadAbortedInitializations) { + final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE); + assert snapshotsInProgress != null; + final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot()); + assert entry != null; + endSnapshot(entry, newState.metaData()); + } } - } - }); + }); + }, this::onFailure); } @Override @@ -1151,26 +1160,27 @@ public void deleteSnapshot(final String repositoryName, final String snapshotNam final boolean immediatePriority) { // First, look for the snapshot in the repository final Repository repository = repositoriesService.repository(repositoryName); - final RepositoryData repositoryData = repository.getRepositoryData(); - Optional matchedEntry = repositoryData.getSnapshotIds() - .stream() - .filter(s -> s.getName().equals(snapshotName)) - .findFirst(); - // if nothing found by the same name, then look in the cluster state for current in progress snapshots - long repoGenId = repositoryData.getGenId(); - if (matchedEntry.isPresent() == false) { - Optional matchedInProgress = currentSnapshots(repositoryName, Collections.emptyList()).stream() - .filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst(); - if (matchedInProgress.isPresent()) { - matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId()); - // Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes - repoGenId = matchedInProgress.get().getRepositoryStateId() + 1L; + repository.getRepositoryData(ActionListener.wrap(repositoryData -> { + Optional matchedEntry = repositoryData.getSnapshotIds() + .stream() + .filter(s -> s.getName().equals(snapshotName)) + .findFirst(); + // if nothing found by the same name, then look in the cluster state for current in progress snapshots + long repoGenId = repositoryData.getGenId(); + if (matchedEntry.isPresent() == false) { + Optional matchedInProgress = currentSnapshots(repositoryName, Collections.emptyList()).stream() + .filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst(); + if (matchedInProgress.isPresent()) { + matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId()); + // Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes + repoGenId = matchedInProgress.get().getRepositoryStateId() + 1L; + } } - } - if (matchedEntry.isPresent() == false) { - throw new SnapshotMissingException(repositoryName, snapshotName); - } - deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repoGenId, immediatePriority); + if (matchedEntry.isPresent() == false) { + throw new SnapshotMissingException(repositoryName, snapshotName); + } + deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repoGenId, immediatePriority); + }, listener::onFailure)); } /** diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index be0f8a4eb3e58..c4b3e5ffbe433 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -149,8 +149,8 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind } @Override - public RepositoryData getRepositoryData() { - return null; + public void getRepositoryData(ActionListener listener) { + listener.onResponse(null); } @Override diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 1f9c1e8cf4566..4a9430df7f686 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -131,7 +131,7 @@ public void testRetrieveSnapshots() throws Exception { (BlobStoreRepository) repositoriesService.repository(repositoryName); final List originalSnapshots = Arrays.asList(snapshotId1, snapshotId2); - List snapshotIds = repository.getRepositoryData().getSnapshotIds().stream() + List snapshotIds = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getSnapshotIds().stream() .sorted((s1, s2) -> s1.getName().compareTo(s2.getName())).collect(Collectors.toList()); assertThat(snapshotIds, equalTo(originalSnapshots)); } @@ -140,10 +140,10 @@ public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception { final BlobStoreRepository repository = setupRepo(); // write to and read from a index file with no entries - assertThat(repository.getRepositoryData().getSnapshotIds().size(), equalTo(0)); + assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getSnapshotIds().size(), equalTo(0)); final RepositoryData emptyData = RepositoryData.EMPTY; repository.writeIndexGen(emptyData, emptyData.getGenId(), true); - RepositoryData repoData = repository.getRepositoryData(); + RepositoryData repoData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository); assertEquals(repoData, emptyData); assertEquals(repoData.getIndices().size(), 0); assertEquals(repoData.getSnapshotIds().size(), 0); @@ -152,12 +152,12 @@ public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception { // write to and read from an index file with snapshots but no indices repoData = addRandomSnapshotsToRepoData(repoData, false); repository.writeIndexGen(repoData, repoData.getGenId(), true); - assertEquals(repoData, repository.getRepositoryData()); + assertEquals(repoData, ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository)); // write to and read from a index file with random repository data - repoData = addRandomSnapshotsToRepoData(repository.getRepositoryData(), true); + repoData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true); repository.writeIndexGen(repoData, repoData.getGenId(), true); - assertEquals(repoData, repository.getRepositoryData()); + assertEquals(repoData, ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository)); } public void testIndexGenerationalFiles() throws Exception { @@ -166,22 +166,22 @@ public void testIndexGenerationalFiles() throws Exception { // write to index generational file RepositoryData repositoryData = generateRandomRepoData(); repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true); - assertThat(repository.getRepositoryData(), equalTo(repositoryData)); + assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), equalTo(repositoryData)); assertThat(repository.latestIndexBlobId(), equalTo(0L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L)); // adding more and writing to a new index generational file - repositoryData = addRandomSnapshotsToRepoData(repository.getRepositoryData(), true); + repositoryData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true); repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true); - assertEquals(repository.getRepositoryData(), repositoryData); + assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData); assertThat(repository.latestIndexBlobId(), equalTo(1L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L)); // removing a snapshot and writing to a new index generational file - repositoryData = repository.getRepositoryData().removeSnapshot( + repositoryData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).removeSnapshot( repositoryData.getSnapshotIds().iterator().next(), ShardGenerations.EMPTY); repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true); - assertEquals(repository.getRepositoryData(), repositoryData); + assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData); assertThat(repository.latestIndexBlobId(), equalTo(2L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L)); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index ed6a4c3441ede..372452fb6e36e 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -18,8 +18,8 @@ */ package org.elasticsearch.snapshots; -import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.settings.Settings; @@ -44,7 +44,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -93,17 +92,13 @@ protected void disableRepoConsistencyCheck(String reason) { skipRepoConsistencyCheckReason = reason; } - protected RepositoryData getRepositoryData(Repository repository) throws InterruptedException { + protected RepositoryData getRepositoryData(Repository repository) { ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName()); - final SetOnce repositoryData = new SetOnce<>(); - final CountDownLatch latch = new CountDownLatch(1); + final PlainActionFuture repositoryData = PlainActionFuture.newFuture(); threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - repositoryData.set(repository.getRepositoryData()); - latch.countDown(); + repository.getRepositoryData(repositoryData); }); - - latch.await(); - return repositoryData.get(); + return repositoryData.actionGet(); } public static long getFailureCount(String repository) { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index c3feeb417d217..12f480bd3a3fb 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -363,7 +363,7 @@ public void testSingleGetAfterRestore() throws Exception { assertThat(client.prepareGet(restoredIndexName, typeName, docId).get().isExists(), equalTo(true)); } - public void testFreshIndexUUID() throws InterruptedException { + public void testFreshIndexUUID() { Client client = client(); logger.info("--> creating repository"); @@ -781,7 +781,7 @@ public void testIncludeGlobalState() throws Exception { assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); } - public void testSnapshotFileFailureDuringSnapshot() throws InterruptedException { + public void testSnapshotFileFailureDuringSnapshot() { disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks"); Client client = client(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 754c264c9123b..638623f303edf 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -75,6 +75,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -156,6 +157,7 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.fs.FsRepository; @@ -308,7 +310,7 @@ public void testSuccessfulSnapshotAndRestore() { SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); @@ -369,7 +371,7 @@ public void testSnapshotWithNodeDisconnects() { SnapshotsInProgress finalSnapshotsInProgress = randomMaster.clusterService.state().custom(SnapshotsInProgress.TYPE); assertThat(finalSnapshotsInProgress.entries(), empty()); final Repository repository = randomMaster.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); } @@ -408,7 +410,7 @@ public void testConcurrentSnapshotCreateAndDelete() { SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); @@ -474,7 +476,7 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(thirdSnapshotResponse == null ? 2 : 3)); for (SnapshotId snapshotId : snapshotIds) { @@ -559,8 +561,8 @@ public void run() { final SnapshotsInProgress finalSnapshotsInProgress = testClusterNodes.randomDataNodeSafe() .clusterService.state().custom(SnapshotsInProgress.TYPE); assertThat(finalSnapshotsInProgress.entries(), empty()); - final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + final Repository repository = testClusterNodes.randomMasterNodeSafe().repositoriesService.repository(repoName); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0))); } @@ -633,7 +635,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); @@ -643,6 +645,14 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { assertEquals(0, snapshotInfo.failedShards()); } + private RepositoryData getRepositoryData(Repository repository) { + final PlainActionFuture res = PlainActionFuture.newFuture(); + repository.getRepositoryData(res); + deterministicTaskQueue.runAllRunnableTasks(); + assertTrue(res.isDone()); + return res.actionGet(); + } + private StepListener createRepoAndIndex(TestClusterNodes.TestClusterNode masterNode, String repoName, String index, int shards) { final AdminClient adminClient = masterNode.client.admin(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java index 78c02b8e80bff..13d94d7eefa0c 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java @@ -136,7 +136,7 @@ public void testExceptionOnMissingSnapBlob() throws IOException { .getSnapshots(new GetSnapshotsRequest("test-repo", new String[] {"test-snap"})).actionGet()); } - public void testExceptionOnMissingShardLevelSnapBlob() throws IOException, InterruptedException { + public void testExceptionOnMissingShardLevelSnapBlob() throws IOException { disableRepoConsistencyCheck("This test intentionally corrupts the repository"); logger.info("--> creating repository"); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index ed7735059ef91..527a391acd0c8 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -79,6 +79,7 @@ import org.elasticsearch.indices.recovery.StartRecoveryRequest; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; @@ -833,7 +834,8 @@ protected String snapshotShard(final IndexShard shard, final Index index = shard.shardId().getIndex(); final IndexId indexId = new IndexId(index.getName(), index.getUUID()); final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing( - repository.getRepositoryData().shardGenerations().getShardGen(indexId, shard.shardId().getId())); + ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).shardGenerations().getShardGen( + indexId, shard.shardId().getId())); final PlainActionFuture future = PlainActionFuture.newFuture(); final String shardGen; try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 4359ed00ae0c5..31d08536f3257 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -85,10 +85,10 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind } @Override - public RepositoryData getRepositoryData() { + public void getRepositoryData(ActionListener listener) { final IndexId indexId = new IndexId(indexName, "blah"); - return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), - Collections.singletonMap(indexId, emptySet()), ShardGenerations.EMPTY); + listener.onResponse(new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(indexId, emptySet()), ShardGenerations.EMPTY)); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 814550be5f899..619bdaf60ab94 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -24,12 +24,14 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotRestoreException; @@ -41,7 +43,6 @@ import java.util.List; import java.util.Locale; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -58,6 +59,10 @@ */ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase { + public static RepositoryData getRepositoryData(Repository repository) { + return PlainActionFuture.get(repository::getRepositoryData); + } + protected abstract String repositoryType(); protected Settings repositorySettings() { @@ -256,16 +261,13 @@ public void testIndicesDeletedFromRepository() throws Exception { BlobStoreRepository repository = (BlobStoreRepository) repositoriesSvc.repository(repoName); final SetOnce indicesBlobContainer = new SetOnce<>(); - final SetOnce repositoryData = new SetOnce<>(); - final CountDownLatch latch = new CountDownLatch(1); + final PlainActionFuture repositoryData = PlainActionFuture.newFuture(); threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { indicesBlobContainer.set(repository.blobStore().blobContainer(repository.basePath().add("indices"))); - repositoryData.set(repository.getRepositoryData()); - latch.countDown(); + repository.getRepositoryData(repositoryData); }); - latch.await(); - for (IndexId indexId : repositoryData.get().getIndices().values()) { + for (IndexId indexId : repositoryData.actionGet().getIndices().values()) { if (indexId.getName().equals("test-idx-3")) { assertFalse(BlobStoreTestUtil.blobExists(indicesBlobContainer.get(), indexId.getId())); // deleted index } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 021e1a282045d..03f972b1d97c3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -225,26 +225,28 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind } @Override - public RepositoryData getRepositoryData() { - Client remoteClient = getRemoteClusterClient(); - ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true) - .get(ccrSettings.getRecoveryActionTimeout()); - MetaData remoteMetaData = response.getState().getMetaData(); - - Map copiedSnapshotIds = new HashMap<>(); - Map snapshotStates = new HashMap<>(copiedSnapshotIds.size()); - Map> indexSnapshots = new HashMap<>(copiedSnapshotIds.size()); - - ImmutableOpenMap remoteIndices = remoteMetaData.getIndices(); - for (String indexName : remoteMetaData.getConcreteAllIndices()) { - // Both the Snapshot name and UUID are set to _latest_ - SnapshotId snapshotId = new SnapshotId(LATEST, LATEST); - copiedSnapshotIds.put(indexName, snapshotId); - snapshotStates.put(indexName, SnapshotState.SUCCESS); - Index index = remoteIndices.get(indexName).getIndex(); - indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singleton(snapshotId)); - } - return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots, ShardGenerations.EMPTY); + public void getRepositoryData(ActionListener listener) { + ActionListener.completeWith(listener, () -> { + Client remoteClient = getRemoteClusterClient(); + ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true) + .get(ccrSettings.getRecoveryActionTimeout()); + MetaData remoteMetaData = response.getState().getMetaData(); + + Map copiedSnapshotIds = new HashMap<>(); + Map snapshotStates = new HashMap<>(copiedSnapshotIds.size()); + Map> indexSnapshots = new HashMap<>(copiedSnapshotIds.size()); + + ImmutableOpenMap remoteIndices = remoteMetaData.getIndices(); + for (String indexName : remoteMetaData.getConcreteAllIndices()) { + // Both the Snapshot name and UUID are set to _latest_ + SnapshotId snapshotId = new SnapshotId(LATEST, LATEST); + copiedSnapshotIds.put(indexName, snapshotId); + snapshotStates.put(indexName, SnapshotState.SUCCESS); + Index index = remoteIndices.get(indexName).getIndex(); + indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singleton(snapshotId)); + } + return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots, ShardGenerations.EMPTY); + }); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 3e49147db6d97..47ffc1a838f32 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -61,6 +61,7 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.ShardGenerations; +import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; @@ -212,7 +213,7 @@ public void testRestoreMinmal() throws IOException { repository.finalizeSnapshot(snapshotId, ShardGenerations.builder().put(indexId, 0, indexShardSnapshotStatus.generation()).build(), indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(), - repository.getRepositoryData().getGenId(), true, + ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(), true, MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(), true, finFuture);