Skip to content

Commit

Permalink
Add repository metadata integrity check API
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Dec 14, 2022
1 parent a3f8abb commit f861f21
Show file tree
Hide file tree
Showing 14 changed files with 1,182 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.repositories.blobstore;

import org.elasticsearch.action.admin.cluster.repositories.integrity.VerifyRepositoryIntegrityAction;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshotsIntegritySuppressor;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.test.CorruptionUtils;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public class BlobStoreMetadataIntegrityIT extends AbstractSnapshotIntegTestCase {

private static final String REPOSITORY_NAME = "test-repo";

private Releasable integrityCheckSuppressor;

@Before
public void suppressIntegrityChecks() {
disableRepoConsistencyCheck("testing integrity checks involves breaking the repo");
assertNull(integrityCheckSuppressor);
integrityCheckSuppressor = new BlobStoreIndexShardSnapshotsIntegritySuppressor();
}

@After
public void enableIntegrityChecks() {
Releasables.closeExpectNoException(integrityCheckSuppressor);
integrityCheckSuppressor = null;
}

public void testIntegrityCheck() throws Exception {
final var repoPath = randomRepoPath();
createRepository(
REPOSITORY_NAME,
"fs",
Settings.builder().put(BlobStoreRepository.SUPPORT_URL_REPO.getKey(), false).put("location", repoPath)
);
final var repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(REPOSITORY_NAME);

final var indexCount = between(1, 3);
for (int i = 0; i < indexCount; i++) {
createIndexWithRandomDocs("test-index-" + i, between(1, 1000));
}

for (int snapshotIndex = 0; snapshotIndex < 4; snapshotIndex++) {
final var indexRequests = new ArrayList<IndexRequestBuilder>();
for (int i = 0; i < indexCount; i++) {
if (randomBoolean()) {
final var indexName = "test-index-" + i;
if (randomBoolean()) {
assertAcked(client().admin().indices().prepareDelete(indexName));
createIndexWithRandomDocs(indexName, between(1, 1000));
}
final var numDocs = between(1, 1000);
for (int doc = 0; doc < numDocs; doc++) {
indexRequests.add(client().prepareIndex(indexName).setSource("field1", "bar " + doc));
}
}
}
indexRandom(true, indexRequests);
assertEquals(0, client().admin().indices().prepareFlush().get().getFailedShards());
createFullSnapshot(REPOSITORY_NAME, "test-snapshot-" + snapshotIndex);
}

final var request = new VerifyRepositoryIntegrityAction.Request(REPOSITORY_NAME, 5, 5, 5, 5, 10000, false);

final var response = PlainActionFuture.<VerifyRepositoryIntegrityAction.Response, RuntimeException>get(
listener -> client().execute(VerifyRepositoryIntegrityAction.INSTANCE, request, listener),
30,
TimeUnit.SECONDS
);
assertThat(response.getRestStatus(), equalTo(RestStatus.OK));
assertThat(response.getExceptions(), empty());

final var tempDir = createTempDir();

final List<Path> blobs;
try (var paths = Files.walk(repoPath)) {
blobs = paths.filter(Files::isRegularFile).sorted().toList();
}
for (final var blob : blobs) {
logger.info("repo contents: {}", blob);
}

final var repositoryDataFuture = new PlainActionFuture<RepositoryData>();
repository.getRepositoryData(repositoryDataFuture);
final var repositoryData = repositoryDataFuture.get();
final var repositoryDataBlob = repoPath.resolve("index-" + repositoryData.getGenId());

for (int i = 0; i < 2000; i++) {
final var blobToDamage = randomFrom(blobs);
final var isDataBlob = blobToDamage.getFileName().toString().startsWith(BlobStoreRepository.UPLOADED_DATA_BLOB_PREFIX);
final var isIndexBlob = blobToDamage.equals(repositoryDataBlob);
if (isDataBlob || isIndexBlob || randomBoolean()) {
logger.info("--> deleting {}", blobToDamage);
Files.move(blobToDamage, tempDir.resolve("tmp"));
} else {
logger.info("--> corrupting {}", blobToDamage);
Files.copy(blobToDamage, tempDir.resolve("tmp"));
CorruptionUtils.corruptFile(random(), blobToDamage);
}
try {
final var isCancelled = new AtomicBoolean();

final var verificationResponse = PlainActionFuture.get(
(PlainActionFuture<List<RepositoryVerificationException>> listener) -> repository.verifyMetadataIntegrity(
request,
listener,
() -> {
if (rarely() && rarely()) {
isCancelled.set(true);
return true;
}
return isCancelled.get();
}
),
30,
TimeUnit.SECONDS
);
assertThat(verificationResponse, not(empty()));
final var responseString = verificationResponse.stream().map(Throwable::getMessage).collect(Collectors.joining("\n"));
if (isCancelled.get()) {
assertThat(responseString, containsString("verification task cancelled before completion"));
}
if (isDataBlob && isCancelled.get() == false) {
assertThat(
responseString,
allOf(containsString(blobToDamage.getFileName().toString()), containsString("missing blob"))
);
}
} catch (RepositoryException e) {
// ok, this means e.g. we couldn't even read the index blob
} finally {
Files.deleteIfExists(blobToDamage);
Files.move(tempDir.resolve("tmp"), blobToDamage);
}

final var repairResponse = PlainActionFuture.<VerifyRepositoryIntegrityAction.Response, RuntimeException>get(
listener -> client().execute(VerifyRepositoryIntegrityAction.INSTANCE, request, listener),
30,
TimeUnit.SECONDS
);
assertThat(repairResponse.getRestStatus(), equalTo(RestStatus.OK));
assertThat(repairResponse.getExceptions(), empty());
}
}
}
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
exports org.elasticsearch.action.admin.cluster.repositories.cleanup;
exports org.elasticsearch.action.admin.cluster.repositories.delete;
exports org.elasticsearch.action.admin.cluster.repositories.get;
exports org.elasticsearch.action.admin.cluster.repositories.integrity;
exports org.elasticsearch.action.admin.cluster.repositories.put;
exports org.elasticsearch.action.admin.cluster.repositories.verify;
exports org.elasticsearch.action.admin.cluster.reroute;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.elasticsearch.action.admin.cluster.repositories.delete.TransportDeleteRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesAction;
import org.elasticsearch.action.admin.cluster.repositories.get.TransportGetRepositoriesAction;
import org.elasticsearch.action.admin.cluster.repositories.integrity.VerifyRepositoryIntegrityAction;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.verify.TransportVerifyRepositoryAction;
Expand Down Expand Up @@ -336,6 +337,7 @@
import org.elasticsearch.rest.action.admin.cluster.RestSnapshottableFeaturesAction;
import org.elasticsearch.rest.action.admin.cluster.RestUpdateDesiredNodesAction;
import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryIntegrityAction;
import org.elasticsearch.rest.action.admin.cluster.dangling.RestDeleteDanglingIndexAction;
import org.elasticsearch.rest.action.admin.cluster.dangling.RestImportDanglingIndexAction;
import org.elasticsearch.rest.action.admin.cluster.dangling.RestListDanglingIndicesAction;
Expand Down Expand Up @@ -588,6 +590,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
actions.register(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class);
actions.register(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class);
actions.register(VerifyRepositoryIntegrityAction.INSTANCE, VerifyRepositoryIntegrityAction.TransportAction.class);
actions.register(CleanupRepositoryAction.INSTANCE, TransportCleanupRepositoryAction.class);
actions.register(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
actions.register(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
Expand Down Expand Up @@ -757,6 +760,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestGetRepositoriesAction(settingsFilter));
registerHandler.accept(new RestDeleteRepositoryAction());
registerHandler.accept(new RestVerifyRepositoryAction());
registerHandler.accept(new RestVerifyRepositoryIntegrityAction());
registerHandler.accept(new RestCleanupRepositoryAction());
registerHandler.accept(new RestGetSnapshotsAction());
registerHandler.accept(new RestCreateSnapshotAction());
Expand Down
Loading

0 comments on commit f861f21

Please sign in to comment.