diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/IndexSnapshotsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/IndexSnapshotsServiceIT.java new file mode 100644 index 0000000000000..4c69b2bb2306f --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/IndexSnapshotsServiceIT.java @@ -0,0 +1,345 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.get.shard.GetShardSnapshotAction; +import org.elasticsearch.action.admin.cluster.snapshots.get.shard.GetShardSnapshotRequest; +import org.elasticsearch.action.admin.cluster.snapshots.get.shard.GetShardSnapshotResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.snapshots.mockstore.MockRepository; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; + +import static org.elasticsearch.snapshots.SnapshotsService.NO_FEATURE_STATES_VALUE; +import static org.elasticsearch.test.VersionUtils.randomVersionBetween; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +public class IndexSnapshotsServiceIT extends AbstractSnapshotIntegTestCase { + public void testGetShardSnapshotFromUnknownRepoReturnsAnError() throws Exception { + boolean useMultipleUnknownRepositories = randomBoolean(); + List repositories = useMultipleUnknownRepositories + ? org.elasticsearch.core.List.of("unknown", "unknown-2") + : org.elasticsearch.core.List.of("unknown"); + final ActionFuture responseFuture = getLatestSnapshotForShardFuture(repositories, "idx", 0, false); + + if (useMultipleUnknownRepositories) { + GetShardSnapshotResponse response = responseFuture.get(); + assertThat(response.getRepositoryShardSnapshots(), is(anEmptyMap())); + + final Map failures = response.getRepositoryFailures(); + for (String repository : repositories) { + RepositoryException repositoryException = failures.get(repository); + assertThat(repositoryException, is(notNullValue())); + assertThat( + repositoryException.getMessage(), + equalTo(String.format(Locale.ROOT, "[%s] Unable to find the latest snapshot for shard [[idx][0]]", repository)) + ); + } + } else { + expectThrows(RepositoryException.class, responseFuture::actionGet); + } + + disableRepoConsistencyCheck("This test checks an empty repository"); + } + + public void testGetShardSnapshotFromEmptyRepositoryReturnsEmptyResult() { + final String fsRepoName = randomAlphaOfLength(10); + createRepository(fsRepoName, FsRepository.TYPE); + + final Optional indexShardSnapshotInfo = getLatestSnapshotForShard(fsRepoName, "test", 0); + assertThat(indexShardSnapshotInfo.isPresent(), equalTo(false)); + + disableRepoConsistencyCheck("This test checks an empty repository"); + } + + public void testGetShardSnapshotFromUnknownIndexReturnsEmptyResult() { + final String fsRepoName = randomAlphaOfLength(10); + createRepository(fsRepoName, FsRepository.TYPE); + + createSnapshot(fsRepoName, "snap-1", Collections.emptyList()); + + final Optional indexShardSnapshotInfo = getLatestSnapshotForShard(fsRepoName, "test", 0); + assertThat(indexShardSnapshotInfo.isPresent(), equalTo(false)); + } + + public void testGetShardSnapshotFromUnknownShardReturnsEmptyResult() { + final String fsRepoName = randomAlphaOfLength(10); + final String indexName = "test-idx"; + + createIndexWithContent(indexName); + + createRepository(fsRepoName, FsRepository.TYPE); + createSnapshot(fsRepoName, "snap-1", Collections.singletonList(indexName)); + + final Optional indexShardSnapshotInfo = getLatestSnapshotForShard(fsRepoName, indexName, 100); + assertThat(indexShardSnapshotInfo.isPresent(), equalTo(false)); + } + + public void testGetShardSnapshotOnEmptyRepositoriesListThrowsAnError() { + expectThrows(IllegalArgumentException.class, () -> getLatestSnapshotForShardFuture(Collections.emptyList(), "idx", 0, false)); + } + + public void testGetShardSnapshotReturnsTheLatestSuccessfulSnapshot() throws Exception { + final String repoName = "repo-name"; + final Path repoPath = randomRepoPath(); + createRepository(repoName, FsRepository.TYPE, repoPath); + + final boolean useBwCFormat = randomBoolean(); + if (useBwCFormat) { + final Version version = randomVersionBetween(random(), Version.V_7_5_0, Version.CURRENT); + initWithSnapshotVersion(repoName, repoPath, version); + // Re-create repo to clear repository data cache + assertAcked(clusterAdmin().prepareDeleteRepository(repoName).get()); + createRepository(repoName, "fs", repoPath); + } + + createSnapshot(repoName, "empty-snap", Collections.emptyList()); + + final String indexName = "test"; + final String indexName2 = "test-2"; + List indices = org.elasticsearch.core.List.of(indexName, indexName2); + createIndex(indexName, indexName2); + SnapshotInfo lastSnapshot = null; + int numSnapshots = randomIntBetween(5, 25); + for (int i = 0; i < numSnapshots; i++) { + if (randomBoolean()) { + indexRandomDocs(indexName, 5); + indexRandomDocs(indexName2, 10); + } + final List snapshotIndices = randomSubsetOf(indices); + final SnapshotInfo snapshotInfo = createSnapshot(repoName, String.format(Locale.ROOT, "snap-%03d", i), snapshotIndices); + if (snapshotInfo.indices().contains(indexName)) { + lastSnapshot = snapshotInfo; + } + } + + if (useBwCFormat) { + // Reload the RepositoryData so we don't use cached data that wasn't serialized + assertAcked(clusterAdmin().prepareDeleteRepository(repoName).get()); + createRepository(repoName, "fs", repoPath); + } + + final Optional indexShardSnapshotInfoOpt = getLatestSnapshotForShard(repoName, indexName, 0); + if (lastSnapshot == null) { + assertThat(indexShardSnapshotInfoOpt.isPresent(), equalTo(false)); + } else { + assertThat(indexShardSnapshotInfoOpt.isPresent(), equalTo(true)); + + final ShardSnapshotInfo shardSnapshotInfo = indexShardSnapshotInfoOpt.get(); + + final ClusterStateResponse clusterStateResponse = admin().cluster().prepareState().execute().actionGet(); + final IndexMetadata indexMetadata = clusterStateResponse.getState().metadata().index(indexName); + final String indexMetadataId = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetadata); + assertThat(shardSnapshotInfo.getIndexMetadataIdentifier(), equalTo(indexMetadataId)); + + final Snapshot snapshot = shardSnapshotInfo.getSnapshot(); + assertThat(snapshot, equalTo(lastSnapshot.snapshot())); + } + } + + public void testGetShardSnapshotWhileThereIsARunningSnapshot() throws Exception { + final String fsRepoName = randomAlphaOfLength(10); + createRepository(fsRepoName, "mock"); + + createSnapshot(fsRepoName, "empty-snap", Collections.emptyList()); + + final String indexName = "test-idx"; + createIndexWithContent(indexName); + + blockAllDataNodes(fsRepoName); + + final String snapshotName = "snap-1"; + final ActionFuture snapshotFuture = client().admin() + .cluster() + .prepareCreateSnapshot(fsRepoName, snapshotName) + .setIndices(indexName) + .setWaitForCompletion(true) + .execute(); + + waitForBlockOnAnyDataNode(fsRepoName); + + assertThat(getLatestSnapshotForShard(fsRepoName, indexName, 0).isPresent(), equalTo(false)); + + unblockAllDataNodes(fsRepoName); + + assertSuccessful(snapshotFuture); + } + + public void testGetShardSnapshotFailureHandlingLetOtherRepositoriesRequestsMakeProgress() throws Exception { + final String failingRepoName = randomAlphaOfLength(10); + createRepository(failingRepoName, "mock"); + int repoCount = randomIntBetween(1, 10); + List workingRepoNames = new ArrayList<>(); + for (int i = 0; i < repoCount; i++) { + final String repoName = randomAlphaOfLength(10); + createRepository(repoName, "fs"); + workingRepoNames.add(repoName); + } + + final String indexName = "test-idx"; + createIndexWithContent(indexName); + + createSnapshot(failingRepoName, "empty-snap", Collections.singletonList(indexName)); + for (String workingRepoName : workingRepoNames) { + createSnapshot(workingRepoName, "empty-snap", Collections.singletonList(indexName)); + } + + final MockRepository repository = getRepositoryOnMaster(failingRepoName); + if (randomBoolean()) { + repository.setBlockAndFailOnReadIndexFiles(); + } else { + repository.setBlockAndFailOnReadSnapFiles(); + } + + PlainActionFuture future = getLatestSnapshotForShardFuture( + CollectionUtils.appendToCopy(workingRepoNames, failingRepoName), + indexName, + 0 + ); + waitForBlock(internalCluster().getMasterName(), failingRepoName); + repository.unblock(); + + final GetShardSnapshotResponse response = future.actionGet(); + + final Optional error = response.getFailureForRepository(failingRepoName); + assertThat(error.isPresent(), is(equalTo(true))); + assertThat( + error.get().getMessage(), + equalTo(String.format(Locale.ROOT, "[%s] Unable to find the latest snapshot for shard [[%s][0]]", failingRepoName, indexName)) + ); + + for (String workingRepoName : workingRepoNames) { + assertThat(response.getFailureForRepository(workingRepoName).isPresent(), is(equalTo(false))); + assertThat(response.getIndexShardSnapshotInfoForRepository(workingRepoName).isPresent(), equalTo(true)); + } + } + + public void testGetShardSnapshotInMultipleRepositories() { + int repoCount = randomIntBetween(2, 10); + List repositories = new ArrayList<>(); + for (int i = 0; i < repoCount; i++) { + final String repoName = randomAlphaOfLength(10); + createRepository(repoName, "fs"); + repositories.add(repoName); + } + + final String indexName = "test-idx"; + createIndexWithContent(indexName); + + Map repositorySnapshots = new HashMap<>(); + for (String repository : repositories) { + repositorySnapshots.put(repository, createSnapshot(repository, "snap-1", Collections.singletonList(indexName))); + } + + GetShardSnapshotResponse response = getLatestSnapshotForShardFuture(repositories, indexName, 0).actionGet(); + + for (String repository : repositories) { + assertThat(response.getFailureForRepository(repository).isPresent(), is(equalTo(false))); + Optional shardSnapshotInfoOpt = response.getIndexShardSnapshotInfoForRepository(repository); + assertThat(shardSnapshotInfoOpt.isPresent(), equalTo(true)); + + ShardSnapshotInfo shardSnapshotInfo = shardSnapshotInfoOpt.get(); + assertThat(shardSnapshotInfo.getSnapshot(), equalTo(repositorySnapshots.get(repository).snapshot())); + } + } + + public void testFailedSnapshotsAreNotReturned() throws Exception { + final String indexName = "test"; + createIndexWithContent(indexName); + + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + + for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + ((MockRepository) repositoriesService.repository(repoName)).setBlockAndFailOnWriteSnapFiles(); + } + + client().admin() + .cluster() + .prepareCreateSnapshot(repoName, "snap") + .setIndices(indexName) + .setWaitForCompletion(false) + .setFeatureStates(NO_FEATURE_STATES_VALUE) + .get(); + + waitForBlockOnAnyDataNode(repoName); + + for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + ((MockRepository) repositoriesService.repository(repoName)).unblock(); + } + + assertBusy(() -> assertThat(getSnapshot(repoName, "snap").state(), equalTo(SnapshotState.PARTIAL))); + + Optional shardSnapshotInfo = getLatestSnapshotForShard(repoName, indexName, 0); + assertThat(shardSnapshotInfo.isPresent(), equalTo(false)); + + final SnapshotInfo snapshotInfo = createSnapshot(repoName, "snap-1", Collections.singletonList(indexName)); + + Optional latestSnapshotForShard = getLatestSnapshotForShard(repoName, indexName, 0); + assertThat(latestSnapshotForShard.isPresent(), equalTo(true)); + assertThat(latestSnapshotForShard.get().getSnapshot(), equalTo(snapshotInfo.snapshot())); + } + + private Optional getLatestSnapshotForShard(String repository, String indexName, int shard) { + final GetShardSnapshotResponse response = getLatestSnapshotForShardFuture(Collections.singletonList(repository), indexName, shard) + .actionGet(); + return response.getIndexShardSnapshotInfoForRepository(repository); + } + + private PlainActionFuture getLatestSnapshotForShardFuture( + List repositories, + String indexName, + int shard + ) { + return getLatestSnapshotForShardFuture(repositories, indexName, shard, true); + } + + private PlainActionFuture getLatestSnapshotForShardFuture( + List repositories, + String indexName, + int shard, + boolean useAllRepositoriesRequest + ) { + ShardId shardId = new ShardId(new Index(indexName, "__na__"), shard); + PlainActionFuture future = PlainActionFuture.newFuture(); + final GetShardSnapshotRequest request; + if (useAllRepositoriesRequest && randomBoolean()) { + request = GetShardSnapshotRequest.latestSnapshotInAllRepositories(shardId); + } else { + request = GetShardSnapshotRequest.latestSnapshotInRepositories(shardId, repositories); + } + + client().execute(GetShardSnapshotAction.INSTANCE, request, future); + return future; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index d2e19cdd42858..8b8de07e6a372 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -65,6 +65,8 @@ import org.elasticsearch.action.admin.cluster.snapshots.features.TransportSnapshottableFeaturesAction; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsAction; import org.elasticsearch.action.admin.cluster.snapshots.get.TransportGetSnapshotsAction; +import org.elasticsearch.action.admin.cluster.snapshots.get.shard.GetShardSnapshotAction; +import org.elasticsearch.action.admin.cluster.snapshots.get.shard.TransportGetShardSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusAction; @@ -543,6 +545,7 @@ public void reg actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); actions.register(SnapshottableFeaturesAction.INSTANCE, TransportSnapshottableFeaturesAction.class); actions.register(ResetFeatureStateAction.INSTANCE, TransportResetFeatureStateAction.class); + actions.register(GetShardSnapshotAction.INSTANCE, TransportGetShardSnapshotAction.class); actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotAction.java new file mode 100644 index 0000000000000..bdf822d79f4c5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotAction.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.get.shard; + +import org.elasticsearch.action.ActionType; + +public class GetShardSnapshotAction extends ActionType { + + public static final GetShardSnapshotAction INSTANCE = new GetShardSnapshotAction(); + public static final String NAME = "internal:admin/snapshot/get_shard"; + + public GetShardSnapshotAction() { + super(NAME, GetShardSnapshotResponse::new); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotRequest.java new file mode 100644 index 0000000000000..aa471d9d0fdfb --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotRequest.java @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.get.shard; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class GetShardSnapshotRequest extends MasterNodeRequest { + private static final String ALL_REPOSITORIES = "_all"; + + private final List repositories; + private final ShardId shardId; + + GetShardSnapshotRequest(List repositories, ShardId shardId) { + assert repositories.isEmpty() == false; + assert repositories.stream().noneMatch(Objects::isNull); + assert repositories.size() == 1 || repositories.stream().noneMatch(repo -> repo.equals(ALL_REPOSITORIES)); + this.repositories = Objects.requireNonNull(repositories); + this.shardId = Objects.requireNonNull(shardId); + } + + public GetShardSnapshotRequest(StreamInput in) throws IOException { + super(in); + this.repositories = in.readStringList(); + this.shardId = new ShardId(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringCollection(repositories); + shardId.writeTo(out); + } + + public static GetShardSnapshotRequest latestSnapshotInAllRepositories(ShardId shardId) { + return new GetShardSnapshotRequest(Collections.singletonList(ALL_REPOSITORIES), shardId); + } + + public static GetShardSnapshotRequest latestSnapshotInRepositories(ShardId shardId, List repositories) { + if (repositories.isEmpty()) { + throw new IllegalArgumentException("Expected at least 1 repository but got none"); + } + + if (repositories.stream().anyMatch(Objects::isNull)) { + throw new NullPointerException("null values are not allowed in the repository list"); + } + return new GetShardSnapshotRequest(repositories, shardId); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + + if (repositories.size() == 0) { + validationException = addValidationError("repositories are missing", validationException); + } + + return validationException; + } + + public boolean getFromAllRepositories() { + return repositories.size() == 1 && ALL_REPOSITORIES.equalsIgnoreCase(repositories.get(0)); + } + + public boolean isSingleRepositoryRequest() { + return repositories.size() == 1 && ALL_REPOSITORIES.equalsIgnoreCase(repositories.get(0)) == false; + } + + public ShardId getShardId() { + return shardId; + } + + public List getRepositories() { + return repositories; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GetShardSnapshotRequest request = (GetShardSnapshotRequest) o; + return Objects.equals(repositories, request.repositories) && Objects.equals(shardId, request.shardId); + } + + @Override + public int hashCode() { + return Objects.hash(repositories, shardId); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponse.java new file mode 100644 index 0000000000000..cc49de8d5e855 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponse.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.get.shard; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.ShardSnapshotInfo; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +public class GetShardSnapshotResponse extends ActionResponse { + public static GetShardSnapshotResponse EMPTY = new GetShardSnapshotResponse(Collections.emptyMap(), Collections.emptyMap()); + + private final Map repositoryShardSnapshots; + private final Map repositoryFailures; + + GetShardSnapshotResponse(Map repositoryShardSnapshots, Map repositoryFailures) { + this.repositoryShardSnapshots = repositoryShardSnapshots; + this.repositoryFailures = repositoryFailures; + } + + GetShardSnapshotResponse(StreamInput in) throws IOException { + super(in); + this.repositoryShardSnapshots = in.readMap(StreamInput::readString, ShardSnapshotInfo::new); + this.repositoryFailures = in.readMap(StreamInput::readString, RepositoryException::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(repositoryShardSnapshots, StreamOutput::writeString, (o, info) -> info.writeTo(o)); + out.writeMap(repositoryFailures, StreamOutput::writeString, (o, err) -> err.writeTo(o)); + } + + public Optional getIndexShardSnapshotInfoForRepository(String repositoryName) { + return Optional.ofNullable(repositoryShardSnapshots.get(repositoryName)); + } + + public Optional getFailureForRepository(String repository) { + return Optional.ofNullable(repositoryFailures.get(repository)); + } + + public Map getRepositoryShardSnapshots() { + return repositoryShardSnapshots; + } + + public Map getRepositoryFailures() { + return repositoryFailures; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/TransportGetShardSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/TransportGetShardSnapshotAction.java new file mode 100644 index 0000000000000..d0d46a69998e5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/TransportGetShardSnapshotAction.java @@ -0,0 +1,151 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.get.shard; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexSnapshotsService; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.ShardSnapshotInfo; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class TransportGetShardSnapshotAction extends TransportMasterNodeAction { + + private final IndexSnapshotsService indexSnapshotsService; + + @Inject + public TransportGetShardSnapshotAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + RepositoriesService repositoriesService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + GetShardSnapshotAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + GetShardSnapshotRequest::new, + indexNameExpressionResolver, + GetShardSnapshotResponse::new, + ThreadPool.Names.SAME + ); + this.indexSnapshotsService = new IndexSnapshotsService(repositoriesService); + } + + @Override + protected void masterOperation(GetShardSnapshotRequest request, ClusterState state, ActionListener listener) + throws Exception { + final Set repositories = getRequestedRepositories(request, state); + final ShardId shardId = request.getShardId(); + + if (repositories.isEmpty()) { + listener.onResponse(GetShardSnapshotResponse.EMPTY); + return; + } + + GroupedActionListener, RepositoryException>> groupedActionListener = new GroupedActionListener<>( + listener.map(this::transformToResponse), + repositories.size() + ); + + BlockingQueue repositoriesQueue = new LinkedBlockingQueue<>(repositories); + getShardSnapshots(repositoriesQueue, shardId, new ActionListener>() { + @Override + public void onResponse(Optional shardSnapshotInfo) { + groupedActionListener.onResponse(Tuple.tuple(shardSnapshotInfo, null)); + } + + @Override + public void onFailure(Exception err) { + if (request.isSingleRepositoryRequest() == false && err instanceof RepositoryException) { + groupedActionListener.onResponse(Tuple.tuple(Optional.empty(), (RepositoryException) err)); + } else { + groupedActionListener.onFailure(err); + } + } + }); + } + + private void getShardSnapshots( + BlockingQueue repositories, + ShardId shardId, + ActionListener> listener + ) { + final String repository = repositories.poll(); + if (repository == null) { + return; + } + + indexSnapshotsService.getLatestSuccessfulSnapshotForShard( + repository, + shardId, + ActionListener.runAfter(listener, () -> getShardSnapshots(repositories, shardId, listener)) + ); + } + + private GetShardSnapshotResponse transformToResponse( + Collection, RepositoryException>> shardSnapshots + ) { + final Map repositoryShardSnapshot = shardSnapshots.stream() + .map(Tuple::v1) + .filter(Objects::nonNull) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toMap(ShardSnapshotInfo::getRepository, Function.identity())); + + final Map failures = shardSnapshots.stream() + .map(Tuple::v2) + .filter(Objects::nonNull) + .collect(Collectors.toMap(RepositoryException::repository, Function.identity())); + + return new GetShardSnapshotResponse(repositoryShardSnapshot, failures); + } + + private Set getRequestedRepositories(GetShardSnapshotRequest request, ClusterState state) { + RepositoriesMetadata repositories = state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + if (request.getFromAllRepositories()) { + return repositories.repositories().stream().map(RepositoryMetadata::name).collect(Collectors.toSet()); + } + + return request.getRepositories().stream().filter(Objects::nonNull).collect(Collectors.toSet()); + } + + @Override + protected ClusterBlockException checkBlock(GetShardSnapshotRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java b/server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java index d4e91f852997c..3338fc252534b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java +++ b/server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java @@ -76,7 +76,7 @@ public String getIndexMetaBlobId(String metaIdentifier) { * @return blob id for the given index metadata */ public String indexMetaBlobId(SnapshotId snapshotId, IndexId indexId) { - final String identifier = lookup.getOrDefault(snapshotId, Collections.emptyMap()).get(indexId); + final String identifier = snapshotIndexMetadataIdentifier(snapshotId, indexId); if (identifier == null) { return snapshotId.getUUID(); } else { @@ -84,6 +84,15 @@ public String indexMetaBlobId(SnapshotId snapshotId, IndexId indexId) { } } + /** + * Gets the {@link org.elasticsearch.cluster.metadata.IndexMetadata} identifier for the given snapshot + * if the snapshot contains the referenced index, otherwise it returns {@code null}. + */ + @Nullable + public String snapshotIndexMetadataIdentifier(SnapshotId snapshotId, IndexId indexId) { + return lookup.getOrDefault(snapshotId, Collections.emptyMap()).get(indexId); + } + /** * Create a new instance with the given snapshot and index metadata uuids and identifiers added. * diff --git a/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java b/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java new file mode 100644 index 0000000000000..e3e2aa971732b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java @@ -0,0 +1,176 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.StepListener; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; +import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class IndexSnapshotsService { + private static final Comparator> START_TIME_COMPARATOR = Comparator.< + Tuple>comparingLong(pair -> pair.v2().getStartTimeMillis()).thenComparing(Tuple::v1); + + private final RepositoriesService repositoriesService; + + public IndexSnapshotsService(RepositoriesService repositoriesService) { + this.repositoriesService = repositoriesService; + } + + public void getLatestSuccessfulSnapshotForShard( + String repositoryName, + ShardId shardId, + ActionListener> originalListener + ) { + final ActionListener> listener = originalListener.delegateResponse( + (delegate, err) -> { + delegate.onFailure( + new RepositoryException(repositoryName, "Unable to find the latest snapshot for shard [" + shardId + "]", err) + ); + } + ); + + final Repository repository = getRepository(repositoryName); + if (repository == null) { + listener.onFailure(new RepositoryMissingException(repositoryName)); + return; + } + + final String indexName = shardId.getIndexName(); + StepListener repositoryDataStepListener = new StepListener<>(); + StepListener snapshotInfoStepListener = new StepListener<>(); + + repositoryDataStepListener.whenComplete(repositoryData -> { + if (repositoryData.hasIndex(indexName) == false) { + listener.onResponse(Optional.empty()); + return; + } + + final IndexId indexId = repositoryData.resolveIndexId(indexName); + final List indexSnapshots = repositoryData.getSnapshots(indexId); + + final Optional latestSnapshotId = indexSnapshots.stream() + .map(snapshotId -> Tuple.tuple(snapshotId, repositoryData.getSnapshotDetails(snapshotId))) + .filter(s -> s.v2().getSnapshotState() != null && s.v2().getSnapshotState() == SnapshotState.SUCCESS) + .filter(s -> s.v2().getStartTimeMillis() != -1 && s.v2().getEndTimeMillis() != -1) + .max(START_TIME_COMPARATOR) + .map(Tuple::v1); + + if (latestSnapshotId.isPresent() == false) { + // It's possible that some of the backups were taken before 7.14 and they were successful backups, but they don't + // have the start/end date populated in RepositoryData. We could fetch all the backups and find out if there is + // a valid candidate, but for simplicity we just consider that we couldn't find any valid snapshot. Existing + // snapshots start/end timestamps should appear in the RepositoryData eventually. + listener.onResponse(Optional.empty()); + return; + } + + final SnapshotId snapshotId = latestSnapshotId.get(); + repository.getSnapshotInfo( + snapshotId, + snapshotInfoStepListener.map( + snapshotInfo -> new FetchShardSnapshotContext(repository, repositoryData, indexId, shardId, snapshotInfo) + ) + ); + }, listener::onFailure); + + snapshotInfoStepListener.whenComplete(fetchSnapshotContext -> { + assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT_META + ']') + : "Expected current thread [" + Thread.currentThread() + "] to be a snapshot meta thread."; + final SnapshotInfo snapshotInfo = fetchSnapshotContext.getSnapshotInfo(); + + if (snapshotInfo == null || snapshotInfo.state() != SnapshotState.SUCCESS) { + // We couldn't find a valid candidate + listener.onResponse(Optional.empty()); + return; + } + + // We fetch BlobStoreIndexShardSnapshots instead of BlobStoreIndexShardSnapshot in order to get the shardStateId that + // allows us to tell whether or not this shard had in-flight operations while the snapshot was taken. + final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots = fetchSnapshotContext.getBlobStoreIndexShardSnapshots(); + final String indexMetadataId = fetchSnapshotContext.getIndexMetadataId(); + + final Optional indexShardSnapshotInfo = blobStoreIndexShardSnapshots.snapshots() + .stream() + .filter(snapshotFiles -> snapshotFiles.snapshot().equals(snapshotInfo.snapshotId().getName())) + .findFirst() + .map(snapshotFiles -> fetchSnapshotContext.createIndexShardSnapshotInfo(indexMetadataId, snapshotFiles)); + + listener.onResponse(indexShardSnapshotInfo); + }, listener::onFailure); + + repository.getRepositoryData(repositoryDataStepListener); + } + + private Repository getRepository(String repositoryName) { + final Map repositories = repositoriesService.getRepositories(); + return repositories.get(repositoryName); + } + + private static class FetchShardSnapshotContext { + private final Repository repository; + private final RepositoryData repositoryData; + private final IndexId indexId; + private final ShardId shardId; + private final SnapshotInfo snapshotInfo; + + FetchShardSnapshotContext( + Repository repository, + RepositoryData repositoryData, + IndexId indexId, + ShardId shardId, + SnapshotInfo snapshotInfo + ) { + this.repository = repository; + this.repositoryData = repositoryData; + this.indexId = indexId; + this.shardId = shardId; + this.snapshotInfo = snapshotInfo; + } + + private String getIndexMetadataId() throws IOException { + final IndexMetaDataGenerations indexMetaDataGenerations = repositoryData.indexMetaDataGenerations(); + String indexMetadataIdentifier = indexMetaDataGenerations.snapshotIndexMetadataIdentifier(snapshotInfo.snapshotId(), indexId); + if (indexMetadataIdentifier != null) { + return indexMetadataIdentifier; + } + // Fallback to load IndexMetadata from the repository and compute the identifier + final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), indexId); + return IndexMetaDataGenerations.buildUniqueIdentifier(indexMetadata); + } + + private BlobStoreIndexShardSnapshots getBlobStoreIndexShardSnapshots() throws IOException { + BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; + final String shardGen = repositoryData.shardGenerations().getShardGen(indexId, shardId.getId()); + return blobStoreRepository.getBlobStoreIndexShardSnapshots(indexId, shardId, shardGen); + } + + private ShardSnapshotInfo createIndexShardSnapshotInfo(String indexMetadataId, SnapshotFiles snapshotFiles) { + return new ShardSnapshotInfo(indexId, shardId, snapshotInfo.snapshot(), indexMetadataId, snapshotFiles.shardStateIdentifier()); + } + + SnapshotInfo getSnapshotInfo() { + return snapshotInfo; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index 021b90be73f23..c3292d4c1dac7 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -595,6 +595,13 @@ public Map resolveIndices(final List indices) { return Collections.unmodifiableMap(resolvedIndices); } + /** + * Checks if any snapshot in this repository contains the specified index in {@code indexName} + */ + public boolean hasIndex(String indexName) { + return indices.containsKey(indexName); + } + /** * Resolve the given index names to index ids, creating new index ids for * new indices in the repository. diff --git a/server/src/main/java/org/elasticsearch/repositories/ShardSnapshotInfo.java b/server/src/main/java/org/elasticsearch/repositories/ShardSnapshotInfo.java new file mode 100644 index 0000000000000..bb0e82e5fe4b8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/ShardSnapshotInfo.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.snapshots.Snapshot; + +import java.io.IOException; +import java.util.Objects; + +public class ShardSnapshotInfo implements Writeable { + private final IndexId indexId; + private final Snapshot snapshot; + private final ShardId shardId; + private final String indexMetadataIdentifier; + @Nullable + private final String shardStateIdentifier; + + public ShardSnapshotInfo( + IndexId indexId, + ShardId shardId, + Snapshot snapshot, + String indexMetadataIdentifier, + @Nullable String shardStateIdentifier + ) { + this.indexId = indexId; + this.shardId = shardId; + this.snapshot = snapshot; + this.indexMetadataIdentifier = indexMetadataIdentifier; + this.shardStateIdentifier = shardStateIdentifier; + } + + public ShardSnapshotInfo(StreamInput in) throws IOException { + this.indexId = new IndexId(in); + this.snapshot = new Snapshot(in); + this.shardId = new ShardId(in); + this.indexMetadataIdentifier = in.readString(); + this.shardStateIdentifier = in.readOptionalString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + indexId.writeTo(out); + snapshot.writeTo(out); + shardId.writeTo(out); + out.writeString(indexMetadataIdentifier); + out.writeOptionalString(shardStateIdentifier); + } + + @Nullable + public String getShardStateIdentifier() { + // It might be null if the shard had in-flight operations meaning that: + // localCheckpoint != maxSeqNo || maxSeqNo != indexShard.getLastSyncedGlobalCheckpoint() when the snapshot was taken + return shardStateIdentifier; + } + + public String getIndexMetadataIdentifier() { + return indexMetadataIdentifier; + } + + public Snapshot getSnapshot() { + return snapshot; + } + + public String getRepository() { + return snapshot.getRepository(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ShardSnapshotInfo that = (ShardSnapshotInfo) o; + return Objects.equals(indexId, that.indexId) + && Objects.equals(snapshot, that.snapshot) + && Objects.equals(shardId, that.shardId) + && Objects.equals(indexMetadataIdentifier, that.indexMetadataIdentifier) + && Objects.equals(shardStateIdentifier, that.shardStateIdentifier); + } + + @Override + public int hashCode() { + return Objects.hash(indexId, snapshot, shardId, indexMetadataIdentifier, shardStateIdentifier); + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 4a91b06462a27..5004171052d24 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -3257,6 +3257,23 @@ public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContaine } } + /** + * Loads all available snapshots in the repository using the given {@code generation} for a shard. When {@code shardGen} + * is null it tries to load it using the BwC mode, listing the available index- blobs in the shard container. + */ + public BlobStoreIndexShardSnapshots getBlobStoreIndexShardSnapshots(IndexId indexId, ShardId shardId, @Nullable String shardGen) + throws IOException { + final int shard = shardId.getId(); + final BlobContainer shardContainer = shardContainer(indexId, shard); + + Set blobs = Collections.emptySet(); + if (shardGen == null) { + blobs = shardContainer.listBlobsByPrefix(INDEX_FILE_PREFIX).keySet(); + } + + return buildBlobStoreIndexShardSnapshots(blobs, shardContainer, shardGen).v1(); + } + /** * Loads all available snapshots in the repository using the given {@code generation} or falling back to trying to determine it from * the given list of blobs in the shard container. diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotRequestSerializationTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotRequestSerializationTests.java new file mode 100644 index 0000000000000..33baf42724866 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotRequestSerializationTests.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.get.shard; + +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; +import java.util.List; + +public class GetShardSnapshotRequestSerializationTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return GetShardSnapshotRequest::new; + } + + @Override + protected GetShardSnapshotRequest createTestInstance() { + ShardId shardId = randomShardId(); + if (randomBoolean()) { + return GetShardSnapshotRequest.latestSnapshotInAllRepositories(shardId); + } else { + List repositories = randomList(1, randomIntBetween(1, 100), () -> randomAlphaOfLength(randomIntBetween(1, 100))); + return GetShardSnapshotRequest.latestSnapshotInRepositories(shardId, repositories); + } + } + + @Override + protected GetShardSnapshotRequest mutateInstance(GetShardSnapshotRequest instance) throws IOException { + ShardId shardId = randomShardId(); + if (instance.getFromAllRepositories()) { + return GetShardSnapshotRequest.latestSnapshotInAllRepositories(shardId); + } else { + return GetShardSnapshotRequest.latestSnapshotInRepositories(shardId, instance.getRepositories()); + } + } + + private ShardId randomShardId() { + return new ShardId(randomAlphaOfLength(10), UUIDs.randomBase64UUID(), randomIntBetween(0, 100)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponseSerializationTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponseSerializationTests.java new file mode 100644 index 0000000000000..18eff167d6403 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponseSerializationTests.java @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.get.shard; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.ShardSnapshotInfo; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.ESTestCase; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class GetShardSnapshotResponseSerializationTests extends ESTestCase { + + public void testSerialization() throws IOException { + // We don't use AbstractWireSerializingTestCase here since it is based on equals and hashCode and + // GetShardSnapshotResponse contains RepositoryException instances that don't implement these methods. + GetShardSnapshotResponse testInstance = createTestInstance(); + GetShardSnapshotResponse deserializedInstance = copyInstance(testInstance); + assertEqualInstances(testInstance, deserializedInstance); + } + + private void assertEqualInstances(GetShardSnapshotResponse expectedInstance, GetShardSnapshotResponse newInstance) { + assertThat(newInstance.getRepositoryShardSnapshots(), equalTo(expectedInstance.getRepositoryShardSnapshots())); + assertEquals(expectedInstance.getRepositoryFailures().keySet(), newInstance.getRepositoryFailures().keySet()); + for (Map.Entry expectedEntry : expectedInstance.getRepositoryFailures().entrySet()) { + ElasticsearchException expectedException = expectedEntry.getValue(); + ElasticsearchException newException = newInstance.getRepositoryFailures().get(expectedEntry.getKey()); + assertThat(newException.getMessage(), containsString(expectedException.getMessage())); + } + } + + private GetShardSnapshotResponse copyInstance(GetShardSnapshotResponse instance) throws IOException { + return copyInstance( + instance, + new NamedWriteableRegistry(Collections.emptyList()), + (out, value) -> value.writeTo(out), + GetShardSnapshotResponse::new, + Version.CURRENT + ); + } + + private GetShardSnapshotResponse createTestInstance() { + Map repositoryShardSnapshots = randomMap(0, randomIntBetween(1, 10), this::repositoryShardSnapshot); + Map repositoryFailures = randomMap(0, randomIntBetween(1, 10), this::repositoryFailure); + + return new GetShardSnapshotResponse(repositoryShardSnapshots, repositoryFailures); + } + + private Tuple repositoryShardSnapshot() { + String repositoryName = randomString(50); + + final String indexName = randomString(50); + ShardId shardId = new ShardId(indexName, UUIDs.randomBase64UUID(), randomIntBetween(0, 100)); + Snapshot snapshot = new Snapshot(randomAlphaOfLength(5), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5))); + String indexMetadataIdentifier = randomString(50); + + IndexId indexId = new IndexId(indexName, randomString(25)); + String shardStateIdentifier = randomBoolean() ? randomString(30) : null; + return Tuple.tuple( + repositoryName, + new ShardSnapshotInfo(indexId, shardId, snapshot, indexMetadataIdentifier, shardStateIdentifier) + ); + } + + private Tuple repositoryFailure() { + String repositoryName = randomString(25); + Throwable cause = randomBoolean() ? null : randomException(); + RepositoryException repositoryException = new RepositoryException(repositoryName, randomString(1024), cause); + return Tuple.tuple(repositoryName, repositoryException); + } + + private Exception randomException() { + return randomFrom( + new FileNotFoundException(), + new IOException(randomString(15)), + new IllegalStateException(randomString(10)), + new IllegalArgumentException(randomString(20)), + new EsRejectedExecutionException(randomString(10)) + ); + } + + private String randomString(int maxLength) { + return randomAlphaOfLength(randomIntBetween(1, maxLength)); + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoTestUtils.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoTestUtils.java index 1824f0f2dcfdb..383da3beca3be 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoTestUtils.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoTestUtils.java @@ -34,7 +34,7 @@ public class SnapshotInfoTestUtils { private SnapshotInfoTestUtils() {} - static SnapshotInfo createRandomSnapshotInfo() { + public static SnapshotInfo createRandomSnapshotInfo() { final Snapshot snapshot = new Snapshot(randomAlphaOfLength(5), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5))); final List indices = Arrays.asList(randomArray(1, 10, String[]::new, () -> randomAlphaOfLengthBetween(2, 20))); diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 29da86a0fc439..dd62ad3a9c3b0 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -148,6 +148,10 @@ public long getFailureCount() { private volatile boolean failReadsAfterUnblock; private volatile boolean throwReadErrorAfterUnblock = false; + private volatile boolean blockAndFailOnReadSnapFile; + + private volatile boolean blockAndFailOnReadIndexFile; + private volatile boolean blocked = false; public MockRepository(RepositoryMetadata metadata, Environment environment, @@ -219,6 +223,8 @@ public synchronized void unblock() { blockOnWriteShardLevelMeta = false; blockOnReadIndexMeta = false; blockOnceOnReadSnapshotInfo.set(false); + blockAndFailOnReadSnapFile = false; + blockAndFailOnReadIndexFile = false; this.notifyAll(); } @@ -264,6 +270,14 @@ public void setFailReadsAfterUnblock(boolean failReadsAfterUnblock) { this.failReadsAfterUnblock = failReadsAfterUnblock; } + public void setBlockAndFailOnReadSnapFiles() { + blockAndFailOnReadSnapFile = true; + } + + public void setBlockAndFailOnReadIndexFiles() { + blockAndFailOnReadIndexFile = true; + } + /** * Enable blocking a single read of {@link org.elasticsearch.snapshots.SnapshotInfo} in case the repo is already blocked on another * file. This allows testing very specific timing issues where a read of {@code SnapshotInfo} is much slower than another concurrent @@ -288,7 +302,7 @@ private synchronized boolean blockExecution() { try { while (blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile || blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta || - blockedIndexId != null) { + blockAndFailOnReadSnapFile || blockAndFailOnReadIndexFile || blockedIndexId != null) { blocked = true; this.wait(); wasBlocked = true; @@ -375,7 +389,9 @@ private void maybeIOExceptionOrBlock(String blobName) throws IOException { throw new IOException("Random IOException"); } else if (blockOnAnyFiles) { blockExecutionAndMaybeWait(blobName); - } else if (blobName.startsWith("snap-") && blockAndFailOnWriteSnapFile) { + } else if (blobName.startsWith("snap-") && (blockAndFailOnWriteSnapFile || blockAndFailOnReadSnapFile)) { + blockExecutionAndFail(blobName); + } else if (blobName.startsWith(INDEX_FILE_PREFIX) && blockAndFailOnReadIndexFile) { blockExecutionAndFail(blobName); } else if (blockedIndexId != null && path().parts().contains(blockedIndexId) && blobName.startsWith("snap-")) { blockExecutionAndMaybeWait(blobName); diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index a5897ee3540e5..85bd028692893 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -396,6 +396,7 @@ public class Constants { "internal:admin/ccr/restore/file_chunk/get", "internal:admin/ccr/restore/session/clear", "internal:admin/ccr/restore/session/put", + "internal:admin/snapshot/get_shard", "internal:admin/xpack/searchable_snapshots/cache/store", "internal:admin/xpack/searchable_snapshots/frozen_cache_info", "internal:admin/xpack/searchable_snapshots/frozen_cache_info[n]",