-
Notifications
You must be signed in to change notification settings - Fork 25k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add peer recovery planners that take into account available snapshots (…
…#76239) This commit adds a new set of classes that would compute a peer recovery plan, based on source files + target files + available snapshots. When possible it would try to maximize the number of files used from a snapshot. It uses repositories with `use_for_peer_recovery` setting set to true. It adds a new recovery setting `indices.recovery.use_snapshots` Relates #73496 Backport of #75840
- Loading branch information
Showing
23 changed files
with
1,803 additions
and
114 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
314 changes: 314 additions & 0 deletions
314
...rnalClusterTest/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsServiceIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,314 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.indices.recovery.plan; | ||
|
||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.support.PlainActionFuture; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.cluster.metadata.RepositoryMetadata; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.blobstore.BlobContainer; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.util.BigArrays; | ||
import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
import org.elasticsearch.core.Tuple; | ||
import org.elasticsearch.env.Environment; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; | ||
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; | ||
import org.elasticsearch.indices.recovery.RecoverySettings; | ||
import org.elasticsearch.plugins.Plugin; | ||
import org.elasticsearch.plugins.RepositoryPlugin; | ||
import org.elasticsearch.repositories.IndexId; | ||
import org.elasticsearch.repositories.RepositoriesService; | ||
import org.elasticsearch.repositories.Repository; | ||
import org.elasticsearch.repositories.RepositoryData; | ||
import org.elasticsearch.repositories.ShardGeneration; | ||
import org.elasticsearch.repositories.ShardSnapshotInfo; | ||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository; | ||
import org.elasticsearch.repositories.fs.FsRepository; | ||
import org.elasticsearch.snapshots.SnapshotException; | ||
import org.elasticsearch.snapshots.SnapshotId; | ||
import org.elasticsearch.test.ESIntegTestCase; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
|
||
import java.io.FileNotFoundException; | ||
import java.io.IOException; | ||
import java.nio.file.Path; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | ||
import static org.hamcrest.Matchers.empty; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.greaterThan; | ||
import static org.hamcrest.Matchers.is; | ||
|
||
public class ShardSnapshotsServiceIT extends ESIntegTestCase { | ||
@Override | ||
protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
return Collections.singletonList(FailingRepoPlugin.class); | ||
} | ||
|
||
public static class FailingRepoPlugin extends Plugin implements RepositoryPlugin { | ||
public static final String TYPE = "failingrepo"; | ||
|
||
@Override | ||
public Map<String, Repository.Factory> getRepositories( | ||
Environment env, | ||
NamedXContentRegistry namedXContentRegistry, | ||
ClusterService clusterService, | ||
BigArrays bigArrays, | ||
RecoverySettings recoverySettings | ||
) { | ||
return Collections.singletonMap( | ||
TYPE, | ||
metadata -> new FailingRepo(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings) | ||
); | ||
} | ||
} | ||
|
||
public static class FailingRepo extends FsRepository { | ||
static final String FAIL_GET_REPOSITORY_DATA_SETTING_KEY = "fail_get_repository_data"; | ||
static final String FAIL_LOAD_SHARD_SNAPSHOT_SETTING_KEY = "fail_load_shard_snapshot"; | ||
static final String FAIL_LOAD_SHARD_SNAPSHOTS_SETTING_KEY = "fail_load_shard_snapshots"; | ||
|
||
private final boolean failGetRepositoryData; | ||
private final boolean failLoadShardSnapshot; | ||
private final boolean failLoadShardSnapshots; | ||
|
||
public FailingRepo(RepositoryMetadata metadata, | ||
Environment environment, | ||
NamedXContentRegistry namedXContentRegistry, | ||
ClusterService clusterService, | ||
BigArrays bigArrays, | ||
RecoverySettings recoverySettings) { | ||
super(metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings); | ||
this.failGetRepositoryData = metadata.settings().getAsBoolean(FAIL_GET_REPOSITORY_DATA_SETTING_KEY, false); | ||
this.failLoadShardSnapshot = metadata.settings().getAsBoolean(FAIL_LOAD_SHARD_SNAPSHOT_SETTING_KEY, false); | ||
this.failLoadShardSnapshots = metadata.settings().getAsBoolean(FAIL_LOAD_SHARD_SNAPSHOTS_SETTING_KEY, false); | ||
} | ||
|
||
@Override | ||
public void getRepositoryData(ActionListener<RepositoryData> listener) { | ||
if (failGetRepositoryData) { | ||
listener.onFailure(new IOException("Failure getting repository data")); | ||
return; | ||
} | ||
super.getRepositoryData(listener); | ||
} | ||
|
||
@Override | ||
public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) { | ||
if (failLoadShardSnapshot) { | ||
throw new SnapshotException( | ||
metadata.name(), | ||
snapshotId, | ||
"failed to read shard snapshot file for [" + shardContainer.path() + ']', | ||
new FileNotFoundException("unable to find file") | ||
); | ||
} | ||
return super.loadShardSnapshot(shardContainer, snapshotId); | ||
} | ||
|
||
@Override | ||
public BlobStoreIndexShardSnapshots getBlobStoreIndexShardSnapshots(IndexId indexId, | ||
ShardId shardId, | ||
ShardGeneration shardGen) throws IOException { | ||
if (failLoadShardSnapshots) { | ||
throw new FileNotFoundException("Failed to get blob store index shard snapshots"); | ||
} | ||
return super.getBlobStoreIndexShardSnapshots(indexId, shardId, shardGen); | ||
} | ||
} | ||
|
||
public void testReturnsEmptyListWhenThereAreNotAvailableRepositories() throws Exception { | ||
String indexName = "test"; | ||
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()); | ||
ShardId shardId = getShardIdForIndex(indexName); | ||
|
||
List<ShardSnapshot> shardSnapshotData = getShardSnapshotShard(shardId); | ||
assertThat(shardSnapshotData, is(empty())); | ||
} | ||
|
||
public void testOnlyFetchesSnapshotFromEnabledRepositories() throws Exception { | ||
final String indexName = "test"; | ||
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()); | ||
ShardId shardId = getShardIdForIndex(indexName); | ||
|
||
for (int i = 0; i < randomIntBetween(1, 50); i++) { | ||
index(indexName, "_doc", Integer.toString(i), Collections.singletonMap("foo", "bar")); | ||
} | ||
|
||
String snapshotName = "snap"; | ||
|
||
int numberOfNonEnabledRepos = randomIntBetween(1, 3); | ||
List<String> nonEnabledRepos = new ArrayList<>(); | ||
for (int i = 0; i < numberOfNonEnabledRepos; i++) { | ||
String repositoryName = "non-enabled-repo-" + i; | ||
Path repoPath = randomRepoPath(); | ||
createRepository(repositoryName, "fs", repoPath, false); | ||
createSnapshot(repositoryName, snapshotName, indexName); | ||
nonEnabledRepos.add(repositoryName); | ||
} | ||
|
||
int numberOfRecoveryEnabledRepositories = randomIntBetween(0, 4); | ||
List<String> recoveryEnabledRepos = new ArrayList<>(); | ||
for (int i = 0; i < numberOfRecoveryEnabledRepositories; i++) { | ||
String repositoryName = "repo-" + i; | ||
createRepository(repositoryName, "fs", randomRepoPath(), true); | ||
recoveryEnabledRepos.add(repositoryName); | ||
createSnapshot(repositoryName, snapshotName, indexName); | ||
} | ||
|
||
List<ShardSnapshot> shardSnapshotDataForShard = getShardSnapshotShard(shardId); | ||
|
||
assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfRecoveryEnabledRepositories))); | ||
for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) { | ||
assertThat(recoveryEnabledRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true))); | ||
assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0))); | ||
|
||
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo(); | ||
assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId))); | ||
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName))); | ||
assertThat(recoveryEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(true))); | ||
assertThat(nonEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(false))); | ||
} | ||
} | ||
|
||
public void testFailingReposAreTreatedAsNonExistingShardSnapshots() throws Exception { | ||
final String indexName = "test"; | ||
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()); | ||
ShardId shardId = getShardIdForIndex(indexName); | ||
|
||
for (int i = 0; i < randomIntBetween(1, 50); i++) { | ||
index(indexName, "_doc", Integer.toString(i), Collections.singletonMap("foo", "bar")); | ||
} | ||
|
||
String snapshotName = "snap"; | ||
|
||
int numberOfFailingRepos = randomIntBetween(1, 3); | ||
List<Tuple<String, Path>> failingRepos = new ArrayList<>(); | ||
for (int i = 0; i < numberOfFailingRepos; i++) { | ||
String repositoryName = "failing-repo-" + i; | ||
Path repoPath = randomRepoPath(); | ||
createRepository(repositoryName, FailingRepoPlugin.TYPE, repoPath, true); | ||
createSnapshot(repositoryName, snapshotName, indexName); | ||
failingRepos.add(Tuple.tuple(repositoryName, repoPath)); | ||
} | ||
|
||
int numberOfWorkingRepositories = randomIntBetween(0, 4); | ||
List<String> workingRepos = new ArrayList<>(); | ||
for (int i = 0; i < numberOfWorkingRepositories; i++) { | ||
String repositoryName = "repo-" + i; | ||
createRepository(repositoryName, "fs", randomRepoPath(), true); | ||
workingRepos.add(repositoryName); | ||
createSnapshot(repositoryName, snapshotName, indexName); | ||
} | ||
|
||
for (Tuple<String, Path> failingRepo : failingRepos) { | ||
// Update repository settings to fail fetching the repository information at any stage | ||
String repoFailureType = | ||
randomFrom(FailingRepo.FAIL_GET_REPOSITORY_DATA_SETTING_KEY, | ||
FailingRepo.FAIL_LOAD_SHARD_SNAPSHOT_SETTING_KEY, | ||
FailingRepo.FAIL_LOAD_SHARD_SNAPSHOTS_SETTING_KEY | ||
); | ||
|
||
assertAcked(client().admin().cluster().preparePutRepository(failingRepo.v1()) | ||
.setType(FailingRepoPlugin.TYPE) | ||
.setVerify(false) | ||
.setSettings(Settings.builder().put(repoFailureType, true).put("location", randomRepoPath())) | ||
); | ||
} | ||
|
||
List<ShardSnapshot> shardSnapshotDataForShard = getShardSnapshotShard(shardId); | ||
|
||
assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfWorkingRepositories))); | ||
for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) { | ||
assertThat(workingRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true))); | ||
assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0))); | ||
|
||
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo(); | ||
assertThat(shardSnapshotInfo.getShardId(), equalTo(shardId)); | ||
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), equalTo(snapshotName)); | ||
} | ||
} | ||
|
||
public void testFetchingInformationFromAnIncompatibleMasterNodeReturnsAnEmptyList() { | ||
String indexName = "test"; | ||
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()); | ||
ShardId shardId = getShardIdForIndex(indexName); | ||
|
||
for (int i = 0; i < randomIntBetween(1, 50); i++) { | ||
index(indexName, "_doc", Integer.toString(i), Collections.singletonMap("foo", "bar")); | ||
} | ||
|
||
String snapshotName = "snap"; | ||
String repositoryName = "repo"; | ||
createRepository(repositoryName, "fs", randomRepoPath(), true); | ||
createSnapshot(repositoryName, snapshotName, indexName); | ||
|
||
RepositoriesService repositoriesService = internalCluster().getMasterNodeInstance(RepositoriesService.class); | ||
ThreadPool threadPool = internalCluster().getMasterNodeInstance(ThreadPool.class); | ||
ClusterService clusterService = internalCluster().getMasterNodeInstance(ClusterService.class); | ||
ShardSnapshotsService shardSnapshotsService = new ShardSnapshotsService(client(), repositoriesService, threadPool, clusterService) { | ||
@Override | ||
protected boolean masterSupportsFetchingLatestSnapshots() { | ||
return false; | ||
} | ||
}; | ||
|
||
PlainActionFuture<List<ShardSnapshot>> latestSnapshots = PlainActionFuture.newFuture(); | ||
shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, latestSnapshots); | ||
assertThat(latestSnapshots.actionGet(), is(empty())); | ||
} | ||
|
||
private List<ShardSnapshot> getShardSnapshotShard(ShardId shardId) throws Exception { | ||
ShardSnapshotsService shardSnapshotsService = getShardSnapshotsService(); | ||
|
||
PlainActionFuture<List<ShardSnapshot>> future = PlainActionFuture.newFuture(); | ||
shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, future); | ||
return future.get(); | ||
} | ||
|
||
private ShardSnapshotsService getShardSnapshotsService() { | ||
RepositoriesService repositoriesService = internalCluster().getMasterNodeInstance(RepositoriesService.class); | ||
ThreadPool threadPool = internalCluster().getMasterNodeInstance(ThreadPool.class); | ||
ClusterService clusterService = internalCluster().getMasterNodeInstance(ClusterService.class); | ||
return new ShardSnapshotsService(client(), repositoriesService, threadPool, clusterService); | ||
} | ||
|
||
private ShardId getShardIdForIndex(String indexName) { | ||
ClusterState state = clusterAdmin().prepareState().get().getState(); | ||
return state.routingTable().index(indexName).shard(0).shardId(); | ||
} | ||
|
||
private void createRepository(String repositoryName, String type, Path location, boolean recoveryEnabledRepo) { | ||
assertAcked(client().admin().cluster().preparePutRepository(repositoryName) | ||
.setType(type) | ||
.setVerify(false) | ||
.setSettings(Settings.builder() | ||
.put("location", location) | ||
.put(BlobStoreRepository.USE_FOR_PEER_RECOVERY_SETTING.getKey(), recoveryEnabledRepo) | ||
) | ||
); | ||
} | ||
|
||
private void createSnapshot(String repoName, String snapshotName, String index) { | ||
clusterAdmin() | ||
.prepareCreateSnapshot(repoName, snapshotName) | ||
.setWaitForCompletion(true) | ||
.setIndices(index) | ||
.get(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.