Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ClusterState as Consistency Source for Snapshot Repositories #49060

Merged
merged 26 commits into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6bef703
Consistent RepositoryData Load on Blobstores
original-brownbear Dec 4, 2019
303b62b
last step
original-brownbear Dec 5, 2019
b4c7861
fix tests
original-brownbear Dec 5, 2019
3e5236a
cs
original-brownbear Dec 5, 2019
fa5d932
fix tests
original-brownbear Dec 5, 2019
ae86319
Merge remote-tracking branch 'elastic/master' into repo-uses-cs
original-brownbear Dec 9, 2019
62999ea
Merge remote-tracking branch 'elastic/master' into repo-uses-cs
original-brownbear Dec 9, 2019
8ff4b80
updated docs
original-brownbear Dec 9, 2019
8a85853
simpler
original-brownbear Dec 9, 2019
a9d01c2
Merge remote-tracking branch 'elastic/master' into repo-uses-cs
original-brownbear Dec 9, 2019
60208f6
simpler
original-brownbear Dec 9, 2019
4c7e179
Merge branch 'master' of https://github.com/elastic/elasticsearch int…
original-brownbear Dec 9, 2019
89fe273
Merge remote-tracking branch 'elastic/master' into repo-uses-cs
original-brownbear Dec 12, 2019
ed9509d
Merge remote-tracking branch 'elastic/master' into repo-uses-cs
original-brownbear Dec 13, 2019
4869b26
remove repository initialization
original-brownbear Dec 13, 2019
4285894
remove repository initialization
original-brownbear Dec 13, 2019
5117a04
add setting overrid
original-brownbear Dec 13, 2019
8842cff
fix test
original-brownbear Dec 13, 2019
7313d38
Merge remote-tracking branch 'elastic/master' into repo-uses-cs
original-brownbear Dec 13, 2019
cb84ee1
rename an remove pointless prop
original-brownbear Dec 13, 2019
2158ea7
Merge remote-tracking branch 'elastic/master' into repo-uses-cs
original-brownbear Dec 16, 2019
3dfcf10
Merge remote-tracking branch 'elastic/master' into repo-uses-cs
original-brownbear Dec 16, 2019
e3f7ff2
correct handling unclean full cluster restart
original-brownbear Dec 16, 2019
564202b
docs
original-brownbear Dec 16, 2019
028e860
stable
original-brownbear Dec 17, 2019
edde42f
Merge remote-tracking branch 'elastic/master' into repo-uses-cs
original-brownbear Dec 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public final class RepositoryData {
*/
public static final long UNKNOWN_REPO_GEN = -2L;

/**
* The generation value indicating that the repository generation could not be determined.
*/
public static final long CORRUPTED_REPO_GEN = -3L;

/**
* An instance initialized for an empty repository.
*/
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -193,13 +194,16 @@ public void testSnapshotWithConflictingName() throws IOException {
private Repository createRepository() {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
BlobStoreTestUtil.mockClusterService(repositoryMetaData)) {
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData);
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually
}
};
clusterService.addStateApplier(event -> repository.updateState(event.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());
repository.start();
return repository;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;

import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;

import java.nio.file.Files;
import java.nio.file.Path;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCase {

public void testConcurrentlyChangeRepositoryContents() throws Exception {
Client client = client();

Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
indexRandom(true,
client().prepareIndex("test-idx-1").setSource("foo", "bar"),
client().prepareIndex("test-idx-2").setSource("foo", "bar"));

final String snapshot = "test-snap";

logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot)
.setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

logger.info("--> move index-N blob to next generation");
final RepositoryData repositoryData =
getRepositoryData(internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName));
Files.move(repo.resolve("index-" + repositoryData.getGenId()), repo.resolve("index-" + (repositoryData.getGenId() + 1)));

assertRepositoryBlocked(client, repoName, snapshot);

if (randomBoolean()) {
logger.info("--> move index-N blob back to initial generation");
Files.move(repo.resolve("index-" + (repositoryData.getGenId() + 1)), repo.resolve("index-" + repositoryData.getGenId()));

logger.info("--> verify repository remains blocked");
assertRepositoryBlocked(client, repoName, snapshot);
}

logger.info("--> remove repository");
assertAcked(client.admin().cluster().prepareDeleteRepository(repoName));

logger.info("--> recreate repository");
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

logger.info("--> delete snapshot");
client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();

logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName)
.addSnapshots(snapshot).get().getSnapshots(repoName));
}

public void testConcurrentlyChangeRepositoryContentsInBwCMode() throws Exception {
Client client = client();

Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put(BlobStoreRepository.ALLOW_CONCURRENT_MODIFICATION.getKey(), true)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
indexRandom(true,
client().prepareIndex("test-idx-1").setSource("foo", "bar"),
client().prepareIndex("test-idx-2").setSource("foo", "bar"));

final String snapshot = "test-snap";

logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot)
.setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

final Repository repository = internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName);

logger.info("--> move index-N blob to next generation");
final RepositoryData repositoryData = getRepositoryData(repository);
final long beforeMoveGen = repositoryData.getGenId();
Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1)));

logger.info("--> verify index-N blob is found at the new location");
assertThat(getRepositoryData(repository).getGenId(), is(beforeMoveGen + 1));

logger.info("--> delete snapshot");
client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();

logger.info("--> verify index-N blob is found at the expected location");
assertThat(getRepositoryData(repository).getGenId(), is(beforeMoveGen + 2));

logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName)
.addSnapshots(snapshot).get().getSnapshots(repoName));
}

public void testFindDanglingLatestGeneration() throws Exception {
Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client().admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
indexRandom(true,
client().prepareIndex("test-idx-1").setSource("foo", "bar"),
client().prepareIndex("test-idx-2").setSource("foo", "bar"));

final String snapshot = "test-snap";

logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshot)
.setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);

logger.info("--> move index-N blob to next generation");
final RepositoryData repositoryData = getRepositoryData(repository);
final long beforeMoveGen = repositoryData.getGenId();
Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1)));

logger.info("--> set next generation as pending in the cluster state");
final PlainActionFuture<Void> csUpdateFuture = PlainActionFuture.newFuture();
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).submitStateUpdateTask("set pending generation",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData())
.putCustom(RepositoriesMetaData.TYPE,
currentState.metaData().<RepositoriesMetaData>custom(RepositoriesMetaData.TYPE).withUpdatedGeneration(
repository.getMetadata().name(), beforeMoveGen, beforeMoveGen + 1)).build()).build();
}

@Override
public void onFailure(String source, Exception e) {
csUpdateFuture.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
csUpdateFuture.onResponse(null);
}
}
);
csUpdateFuture.get();

logger.info("--> full cluster restart");
internalCluster().fullRestart();
ensureGreen();

Repository repositoryAfterRestart = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);

logger.info("--> verify index-N blob is found at the new location");
assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 1));

logger.info("--> delete snapshot");
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();

logger.info("--> verify index-N blob is found at the expected location");
assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 2));

logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client().admin().cluster().prepareGetSnapshots(repoName)
.addSnapshots(snapshot).get().getSnapshots(repoName));
}

private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {
logger.info("--> try to delete snapshot");
final RepositoryException repositoryException3 = expectThrows(RepositoryException.class,
() -> client.admin().cluster().prepareDeleteSnapshot(repo, existingSnapshot).execute().actionGet());
assertThat(repositoryException3.getMessage(),
containsString("Could not read repository data because the contents of the repository do not match its expected state."));

logger.info("--> try to create snapshot");
final RepositoryException repositoryException4 = expectThrows(RepositoryException.class,
() -> client.admin().cluster().prepareCreateSnapshot(repo, existingSnapshot).execute().actionGet());
assertThat(repositoryException4.getMessage(),
containsString("Could not read repository data because the contents of the repository do not match its expected state."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ public void testOverwriteSnapshotInfoBlob() {
try (BlobStoreRepository repository =
new MockEventuallyConsistentRepository(metaData, xContentRegistry(), clusterService, blobStoreContext, random())) {
clusterService.addStateApplier(event -> repository.updateState(event.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());
repository.start();

// We create a snap- blob for snapshot "foo" in the first generation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void setUp() throws Exception {
@Override
public void tearDown() throws Exception {
deleteAndAssertEmpty(getRepository().basePath());
client().admin().cluster().prepareDeleteRepository("test-repo").get();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't reuse the repo across tests that clear out its content any longer

super.tearDown();
}

Expand Down Expand Up @@ -169,8 +170,6 @@ protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, Blo
}

public void testCleanup() throws Exception {
createRepository("test-repo");

createIndex("test-idx-1");
createIndex("test-idx-2");
createIndex("test-idx-3");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.repositories.blobstore;

import org.apache.lucene.util.SameThreadExecutorService;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand All @@ -29,6 +30,8 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -69,6 +72,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.elasticsearch.test.ESTestCase.buildNewFakeTransportAddress;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -326,7 +330,11 @@ private static ClusterService mockClusterService(ClusterState initialState) {
final ClusterService clusterService = mock(ClusterService.class);
final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class);
when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);
final AtomicReference<ClusterState> currentState = new AtomicReference<>(initialState);
// Setting local node as master so it may update the repository metadata in the cluster state
final DiscoveryNode localNode = new DiscoveryNode("", buildNewFakeTransportAddress(), Version.CURRENT);
final AtomicReference<ClusterState> currentState = new AtomicReference<>(
ClusterState.builder(initialState).nodes(
DiscoveryNodes.builder().add(localNode).masterNodeId(localNode.getId()).localNodeId(localNode.getId()).build()).build());
when(clusterService.state()).then(invocationOnMock -> currentState.get());
final List<ClusterStateApplier> appliers = new CopyOnWriteArrayList<>();
doAnswer(invocation -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
Expand Down Expand Up @@ -352,8 +353,12 @@ private Environment createEnvironment() {
private Repository createRepository() {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
BlobStoreTestUtil.mockClusterService(repositoryMetaData));
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData);
final Repository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService);
clusterService.addStateApplier(e -> repository.updateState(e.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());
return repository;
}

private static void runAsSnapshot(ThreadPool pool, Runnable runnable) {
Expand Down