diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java index 2788bac6600b9..80ad0fe2631e6 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java @@ -209,6 +209,7 @@ static ClientConfiguration buildConfiguration(S3ClientSettings clientSettings) { clientConfiguration.setMaxErrorRetry(clientSettings.maxRetries); clientConfiguration.setUseThrottleRetries(clientSettings.throttleRetries); clientConfiguration.setSocketTimeout(clientSettings.readTimeoutMillis); + clientConfiguration.setMaxConnections(200); // TODO WIP reduce this return clientConfiguration; } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreMetadataIntegrityIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreMetadataIntegrityIT.java new file mode 100644 index 0000000000000..47a2659aa92c3 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreMetadataIntegrityIT.java @@ -0,0 +1,348 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.repositories.blobstore; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.repositories.integrity.VerifyRepositoryIntegrityAction; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.query.ExistsQueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshotsIntegritySuppressor; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; +import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.CorruptionUtils; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.junit.After; +import org.junit.Before; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.not; + +public class BlobStoreMetadataIntegrityIT extends AbstractSnapshotIntegTestCase { + + private static final String REPOSITORY_NAME = "test-repo"; + + private Releasable integrityCheckSuppressor; + + @Before + public void suppressIntegrityChecks() { + disableRepoConsistencyCheck("testing integrity checks involves breaking the repo"); + assertNull(integrityCheckSuppressor); + integrityCheckSuppressor = new BlobStoreIndexShardSnapshotsIntegritySuppressor(); + } + + @After + public void enableIntegrityChecks() { + Releasables.closeExpectNoException(integrityCheckSuppressor); + integrityCheckSuppressor = null; + } + + @TestLogging(reason = "testing", value = "org.elasticsearch.repositories.blobstore.MetadataVerifier:DEBUG") + public void testIntegrityCheck() throws Exception { + final var repoPath = randomRepoPath(); + createRepository( + REPOSITORY_NAME, + "mock", + Settings.builder().put(BlobStoreRepository.SUPPORT_URL_REPO.getKey(), false).put("location", repoPath) + ); + final MockRepository repository = (MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class) + .repository(REPOSITORY_NAME); + + final var indexCount = between(1, 3); + for (int i = 0; i < indexCount; i++) { + createIndexWithRandomDocs("test-index-" + i, between(1, 1000)); + } + + final var snapshotCount = between(2, 4); + for (int snapshotIndex = 0; snapshotIndex < snapshotCount; snapshotIndex++) { + final var indexRequests = new ArrayList(); + for (int i = 0; i < indexCount; i++) { + if (randomBoolean()) { + final var indexName = "test-index-" + i; + if (randomBoolean()) { + assertAcked(client().admin().indices().prepareDelete(indexName)); + createIndexWithRandomDocs(indexName, between(1, 1000)); + } + final var numDocs = between(1, 1000); + for (int doc = 0; doc < numDocs; doc++) { + indexRequests.add(client().prepareIndex(indexName).setSource("field1", "bar " + doc)); + } + } + } + indexRandom(true, indexRequests); + assertEquals(0, client().admin().indices().prepareFlush().get().getFailedShards()); + final var snapshotInfo = clusterAdmin().prepareCreateSnapshot(REPOSITORY_NAME, "test-snapshot-" + snapshotIndex) + .setIncludeGlobalState(randomBoolean()) + .setWaitForCompletion(true) + .get() + .getSnapshotInfo(); + assertThat(snapshotInfo.successfulShards(), is(snapshotInfo.totalShards())); + assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS)); + } + + repository.setBlockOnReadIndexMeta(); + + final var tasksFuture = new PlainActionFuture>(); + repository.threadPool().generic().execute(() -> { + try { + assertBusy(() -> assertTrue(repository.blocked())); + } catch (Exception e) { + throw new AssertionError(e); + } + + ActionListener.completeWith( + tasksFuture, + () -> client().admin() + .cluster() + .prepareListTasks() + .setDetailed(true) + .get() + .getTasks() + .stream() + .filter(t -> t.action().equals(VerifyRepositoryIntegrityAction.NAME) && t.status() != null) + .toList() + ); + + repository.unblock(); + }); + + verifyAndAssertSuccessful(indexCount); + + final var tasks = tasksFuture.actionGet(30, TimeUnit.SECONDS); + assertThat(tasks, not(empty())); + for (TaskInfo task : tasks) { + if (task.status()instanceof VerifyRepositoryIntegrityAction.Status status) { + assertEquals(REPOSITORY_NAME, status.repositoryName()); + assertThat(status.repositoryGeneration(), greaterThan(0L)); + assertEquals(snapshotCount, status.snapshotCount()); + assertEquals(snapshotCount, status.snapshotsVerified()); + assertEquals(indexCount, status.indexCount()); + assertEquals(0, status.indicesVerified()); + assertThat(status.indexSnapshotCount(), greaterThanOrEqualTo((long) indexCount)); + assertEquals(0, status.indexSnapshotsVerified()); + assertEquals(0, status.anomalyCount()); + } else { + assert false : Strings.toString(task); + } + } + + final var tempDir = createTempDir(); + + final var repositoryData = PlainActionFuture.get(repository::getRepositoryData, 10, TimeUnit.SECONDS); + final var repositoryDataBlob = repoPath.resolve("index-" + repositoryData.getGenId()); + + final List blobs; + try (var paths = Files.walk(repoPath)) { + blobs = paths.filter(path -> Files.isRegularFile(path) && path.equals(repositoryDataBlob) == false).sorted().toList(); + } + + for (int i = 0; i < 2000; i++) { + final var blobToDamage = randomFrom(blobs); + final var isDataBlob = blobToDamage.getFileName().toString().startsWith(BlobStoreRepository.UPLOADED_DATA_BLOB_PREFIX); + final var truncate = randomBoolean(); + if (truncate) { + logger.info("--> truncating {}", blobToDamage); + Files.copy(blobToDamage, tempDir.resolve("tmp")); + Files.write(blobToDamage, new byte[0]); + } else if (isDataBlob || randomBoolean()) { + logger.info("--> deleting {}", blobToDamage); + Files.move(blobToDamage, tempDir.resolve("tmp")); + } else { + logger.info("--> corrupting {}", blobToDamage); + Files.copy(blobToDamage, tempDir.resolve("tmp")); + CorruptionUtils.corruptFile(random(), blobToDamage); + } + try { + // TODO include some cancellation tests + + final var anomalies = verifyAndGetAnomalies(indexCount, repoPath.relativize(blobToDamage)); + if (isDataBlob) { + final var expectedAnomaly = truncate + ? MetadataVerifier.Anomaly.MISMATCHED_BLOB_LENGTH + : MetadataVerifier.Anomaly.MISSING_BLOB; + var foundExpectedAnomaly = false; + for (SearchHit anomaly : anomalies) { + final var source = anomaly.getSourceAsMap(); + if (expectedAnomaly.toString().equals(source.get("anomaly")) + && blobToDamage.getFileName().toString().equals(source.get("blob_name"))) { + foundExpectedAnomaly = true; + break; + } + } + assertTrue(foundExpectedAnomaly); + } + + // + // final var isCancelled = new AtomicBoolean(); + // + // final var verificationResponse = PlainActionFuture.get( + // (PlainActionFuture listener) -> repository.verifyMetadataIntegrity( + // client(), + // () -> new RecyclerBytesStreamOutput(NON_RECYCLING_INSTANCE), + // request, + // listener, + // () -> { + // if (rarely() && rarely()) { + // isCancelled.set(true); + // return true; + // } + // return isCancelled.get(); + // } + // ), + // 30, + // TimeUnit.SECONDS + // ); + // for (SearchHit hit : client().prepareSearch("metadata_verification_results").setSize(10000).get().getHits().getHits()) { + // logger.info("--> {}", Strings.toString(hit)); + // } + // assertThat(verificationResponse, not(nullValue())); + // final var responseString = verificationResponse.stream().map(Throwable::getMessage).collect(Collectors.joining("\n")); + // if (isCancelled.get()) { + // assertThat(responseString, containsString("verification task cancelled before completion")); + // } + // if (isDataBlob && isCancelled.get() == false) { + // assertThat( + // responseString, + // allOf(containsString(blobToDamage.getFileName().toString()), containsString("missing blob")) + // ); + // } + } finally { + Files.deleteIfExists(blobToDamage); + Files.move(tempDir.resolve("tmp"), blobToDamage); + } + + verifyAndAssertSuccessful(indexCount); + } + } + + private void verifyAndAssertSuccessful(int indexCount) { + PlainActionFuture.get( + listener -> client().execute( + VerifyRepositoryIntegrityAction.INSTANCE, + new VerifyRepositoryIntegrityAction.Request(REPOSITORY_NAME, Strings.EMPTY_ARRAY, 0, 0, 0, 0), + listener + ), + 30, + TimeUnit.SECONDS + ); + assertEquals( + 0, + client().prepareSearch("metadata_verification_results") + .setSize(0) + .setQuery(new ExistsQueryBuilder("anomaly")) + .get() + .getHits() + .getTotalHits().value + ); + assertEquals( + indexCount, + client().prepareSearch("metadata_verification_results") + .setSize(0) + .setQuery(new ExistsQueryBuilder("restorability")) + .setTrackTotalHits(true) + .get() + .getHits() + .getTotalHits().value + ); + assertEquals( + indexCount, + client().prepareSearch("metadata_verification_results") + .setSize(0) + .setQuery(new TermQueryBuilder("restorability", "full")) + .setTrackTotalHits(true) + .get() + .getHits() + .getTotalHits().value + ); + assertEquals( + 0, + client().prepareSearch("metadata_verification_results") + .setSize(1) + .setQuery(new TermQueryBuilder("completed", true)) + .get() + .getHits() + .getHits()[0].getSourceAsMap().get("total_anomalies") + ); + assertAcked(client().admin().indices().prepareDelete("metadata_verification_results")); + } + + private SearchHit[] verifyAndGetAnomalies(long indexCount, Path damagedBlob) { + PlainActionFuture.get( + listener -> client().execute( + VerifyRepositoryIntegrityAction.INSTANCE, + new VerifyRepositoryIntegrityAction.Request(REPOSITORY_NAME, Strings.EMPTY_ARRAY, 0, 0, 0, 0), + listener + ), + 30, + TimeUnit.SECONDS + ); + final var anomalyHits = client().prepareSearch("metadata_verification_results") + .setSize(10000) + .setQuery(new ExistsQueryBuilder("anomaly")) + .get() + .getHits(); + assertThat(anomalyHits.getTotalHits().value, greaterThan(0L)); + assertEquals( + indexCount, + client().prepareSearch("metadata_verification_results") + .setSize(0) + .setQuery(new ExistsQueryBuilder("restorability")) + .setTrackTotalHits(true) + .get() + .getHits() + .getTotalHits().value + ); + final var damagedFileName = damagedBlob.getFileName().toString(); + assertThat( + client().prepareSearch("metadata_verification_results") + .setSize(0) + .setQuery(new TermQueryBuilder("restorability", "full")) + .setTrackTotalHits(true) + .get() + .getHits() + .getTotalHits().value, + damagedFileName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) + || damagedFileName.startsWith(BlobStoreRepository.UPLOADED_DATA_BLOB_PREFIX) + || (damagedFileName.startsWith(BlobStoreRepository.METADATA_PREFIX) && damagedBlob.startsWith("indices")) + ? lessThan(indexCount) + : equalTo(indexCount) + ); + final int totalAnomalies = (int) client().prepareSearch("metadata_verification_results") + .setSize(1) + .setQuery(new TermQueryBuilder("completed", true)) + .get() + .getHits() + .getHits()[0].getSourceAsMap().get("total_anomalies"); + assertThat(totalAnomalies, greaterThan(0)); + assertAcked(client().admin().indices().prepareDelete("metadata_verification_results")); + return anomalyHits.getHits(); + } +} diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 0ced8611f293c..0124059eb756a 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -70,6 +70,7 @@ exports org.elasticsearch.action.admin.cluster.repositories.cleanup; exports org.elasticsearch.action.admin.cluster.repositories.delete; exports org.elasticsearch.action.admin.cluster.repositories.get; + exports org.elasticsearch.action.admin.cluster.repositories.integrity; exports org.elasticsearch.action.admin.cluster.repositories.put; exports org.elasticsearch.action.admin.cluster.repositories.verify; exports org.elasticsearch.action.admin.cluster.reroute; diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 59d055e27415c..12d43d119cb44 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -60,6 +60,7 @@ import org.elasticsearch.action.admin.cluster.repositories.delete.TransportDeleteRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesAction; import org.elasticsearch.action.admin.cluster.repositories.get.TransportGetRepositoriesAction; +import org.elasticsearch.action.admin.cluster.repositories.integrity.VerifyRepositoryIntegrityAction; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.verify.TransportVerifyRepositoryAction; @@ -336,6 +337,7 @@ import org.elasticsearch.rest.action.admin.cluster.RestSnapshottableFeaturesAction; import org.elasticsearch.rest.action.admin.cluster.RestUpdateDesiredNodesAction; import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryAction; +import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryIntegrityAction; import org.elasticsearch.rest.action.admin.cluster.dangling.RestDeleteDanglingIndexAction; import org.elasticsearch.rest.action.admin.cluster.dangling.RestImportDanglingIndexAction; import org.elasticsearch.rest.action.admin.cluster.dangling.RestListDanglingIndicesAction; @@ -588,6 +590,7 @@ public void reg actions.register(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class); actions.register(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class); actions.register(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class); + actions.register(VerifyRepositoryIntegrityAction.INSTANCE, VerifyRepositoryIntegrityAction.TransportAction.class); actions.register(CleanupRepositoryAction.INSTANCE, TransportCleanupRepositoryAction.class); actions.register(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class); actions.register(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class); @@ -757,6 +760,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestGetRepositoriesAction(settingsFilter)); registerHandler.accept(new RestDeleteRepositoryAction()); registerHandler.accept(new RestVerifyRepositoryAction()); + registerHandler.accept(new RestVerifyRepositoryIntegrityAction()); registerHandler.accept(new RestCleanupRepositoryAction()); registerHandler.accept(new RestGetSnapshotsAction()); registerHandler.accept(new RestCreateSnapshotAction()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/integrity/VerifyRepositoryIntegrityAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/integrity/VerifyRepositoryIntegrityAction.java new file mode 100644 index 0000000000000..31c8c4edba25e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/integrity/VerifyRepositoryIntegrityAction.java @@ -0,0 +1,323 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.cluster.repositories.integrity; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Supplier; + +public class VerifyRepositoryIntegrityAction extends ActionType { + + public static final VerifyRepositoryIntegrityAction INSTANCE = new VerifyRepositoryIntegrityAction(); + public static final String NAME = "cluster:admin/repository/verify_integrity"; + + private VerifyRepositoryIntegrityAction() { + super(NAME, in -> ActionResponse.Empty.INSTANCE); + } + + public static class Request extends MasterNodeReadRequest { + + private final String repository; + private final String[] indices; + private final int threadPoolConcurrency; + private final int snapshotVerificationConcurrency; + private final int indexVerificationConcurrency; + private final int indexSnapshotVerificationConcurrency; + + public Request( + String repository, + String[] indices, + int threadPoolConcurrency, + int snapshotVerificationConcurrency, + int indexVerificationConcurrency, + int indexSnapshotVerificationConcurrency + ) { + this.repository = repository; + this.indices = Objects.requireNonNull(indices, "indices"); + this.threadPoolConcurrency = requireNonNegative("threadPoolConcurrency", threadPoolConcurrency); + this.snapshotVerificationConcurrency = requireNonNegative("snapshotVerificationConcurrency", snapshotVerificationConcurrency); + this.indexVerificationConcurrency = requireNonNegative("indexVerificationConcurrency", indexVerificationConcurrency); + this.indexSnapshotVerificationConcurrency = requireNonNegative( + "indexSnapshotVerificationConcurrency", + indexSnapshotVerificationConcurrency + ); + } + + private static int requireNonNegative(String name, int value) { + if (value < 0) { + throw new IllegalArgumentException("argument [" + name + "] must be at least [0]"); + } + return value; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.repository = in.readString(); + this.indices = in.readStringArray(); + this.threadPoolConcurrency = in.readVInt(); + this.snapshotVerificationConcurrency = in.readVInt(); + this.indexVerificationConcurrency = in.readVInt(); + this.indexSnapshotVerificationConcurrency = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(repository); + out.writeStringArray(indices); + out.writeVInt(threadPoolConcurrency); + out.writeVInt(snapshotVerificationConcurrency); + out.writeVInt(indexVerificationConcurrency); + out.writeVInt(indexSnapshotVerificationConcurrency); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new VerifyRepositoryIntegrityAction.Task(id, type, action, getDescription(), parentTaskId, headers); + } + + public String getRepository() { + return repository; + } + + public String[] getIndices() { + return indices; + } + + public int getThreadPoolConcurrency() { + return threadPoolConcurrency; + } + + public int getSnapshotVerificationConcurrency() { + return snapshotVerificationConcurrency; + } + + public int getIndexVerificationConcurrency() { + return indexVerificationConcurrency; + } + + public int getIndexSnapshotVerificationConcurrency() { + return indexSnapshotVerificationConcurrency; + } + + public Request withDefaultThreadpoolConcurrency(ThreadPool.Info threadPoolInfo) { + if (threadPoolConcurrency > 0 + && snapshotVerificationConcurrency > 0 + && indexVerificationConcurrency > 0 + && indexSnapshotVerificationConcurrency > 0) { + return this; + } + + final var maxThreads = Math.max(1, threadPoolInfo.getMax()); + final var halfMaxThreads = Math.max(1, maxThreads / 2); + final var request = new Request( + repository, + indices, + threadPoolConcurrency > 0 ? threadPoolConcurrency : halfMaxThreads, + snapshotVerificationConcurrency > 0 ? snapshotVerificationConcurrency : halfMaxThreads, + indexVerificationConcurrency > 0 ? indexVerificationConcurrency : maxThreads, + indexSnapshotVerificationConcurrency > 0 ? indexSnapshotVerificationConcurrency : 1 + ); + request.masterNodeTimeout(masterNodeTimeout()); + return request; + } + } + + public record Status( + String repositoryName, + long repositoryGeneration, + String repositoryUUID, + long snapshotCount, + long snapshotsVerified, + long indexCount, + long indicesVerified, + long indexSnapshotCount, + long indexSnapshotsVerified, + long anomalyCount + ) implements org.elasticsearch.tasks.Task.Status { + + public static String NAME = "verify_repository_status"; + + public Status(StreamInput in) throws IOException { + this( + in.readString(), + in.readVLong(), + in.readString(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong() + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(repositoryName); + out.writeVLong(repositoryGeneration); + out.writeString(repositoryUUID); + out.writeVLong(snapshotCount); + out.writeVLong(snapshotsVerified); + out.writeVLong(indexCount); + out.writeVLong(indicesVerified); + out.writeVLong(indexSnapshotCount); + out.writeVLong(indexSnapshotsVerified); + out.writeVLong(anomalyCount); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject("repository"); + builder.field("name", repositoryName); + builder.field("uuid", repositoryUUID); + builder.field("generation", repositoryGeneration); + builder.endObject(); + builder.startObject("snapshots"); + builder.field("verified", snapshotsVerified); + builder.field("total", snapshotCount); + builder.endObject(); + builder.startObject("indices"); + builder.field("verified", indicesVerified); + builder.field("total", indexCount); + builder.endObject(); + builder.startObject("index_snapshots"); + builder.field("verified", indexSnapshotsVerified); + builder.field("total", indexSnapshotCount); + builder.endObject(); + builder.field("anomalies", anomalyCount); + builder.endObject(); + return builder; + } + + @Override + public String getWriteableName() { + return NAME; + } + } + + public static class Task extends CancellableTask { + + private volatile Supplier statusSupplier; + + public Task(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); + } + + public void setStatusSupplier(Supplier statusSupplier) { + this.statusSupplier = statusSupplier; + } + + @Override + public Status getStatus() { + return Optional.ofNullable(statusSupplier).map(Supplier::get).orElse(null); + } + } + + public static class TransportAction extends TransportMasterNodeReadAction { + + private final RepositoriesService repositoriesService; + private final NodeClient client; + + @Inject + public TransportAction( + TransportService transportService, + ClusterService clusterService, + RepositoriesService repositoriesService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + NodeClient client + ) { + super( + NAME, + true, + transportService, + clusterService, + threadPool, + actionFilters, + Request::new, + indexNameExpressionResolver, + in -> ActionResponse.Empty.INSTANCE, + ThreadPool.Names.SNAPSHOT_META + ); + this.repositoriesService = repositoriesService; + this.client = client; + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected void masterOperation( + org.elasticsearch.tasks.Task task, + Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + // TODO add mechanism to block blob deletions while this is running + final var verifyTask = (Task) task; + + final ClusterStateListener noLongerMasterListener = event -> { + if (event.localNodeMaster() == false) { + transportService.getTaskManager().cancel(verifyTask, "no longer master", () -> {}); + } + }; + clusterService.addListener(noLongerMasterListener); + + repositoriesService.repository(request.getRepository()) + .verifyMetadataIntegrity( + client, + transportService::newNetworkBytesStream, + request.withDefaultThreadpoolConcurrency(clusterService.threadPool().info(ThreadPool.Names.SNAPSHOT_META)), + ActionListener.runAfter( + listener.map(ignored -> ActionResponse.Empty.INSTANCE), + () -> clusterService.removeListener(noLongerMasterListener) + ), + verifyTask::isCancelled, + verifyTask::setStatusSupplier + ); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledIterator.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledIterator.java new file mode 100644 index 0000000000000..3df6629a272c1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledIterator.java @@ -0,0 +1,152 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.RefCounted; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Strings; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; + +import java.util.Iterator; +import java.util.Objects; +import java.util.concurrent.Semaphore; +import java.util.function.BiConsumer; + +public class ThrottledIterator implements Releasable { + + private static final Logger logger = LogManager.getLogger(ThrottledIterator.class); + + /** + * Iterate through the given collection, performing an operation on each item which may fork background tasks, but with a limit on the + * number of such background tasks running concurrently to avoid overwhelming the rest of the system (e.g. starving other work of access + * to an executor). + * + * @param iterator The items to iterate. May be accessed by multiple threads, but accesses are all protected by synchronizing on itself. + * @param itemConsumer The operation to perform on each item. Each operation receives a {@link RefCounted} which can be used to track + * the execution of any background tasks spawned for this item. This operation may run on the thread which + * originally called {@link #run}, if this method has not yet returned. Otherwise it will run on a thread on which a + * background task previously called {@link RefCounted#decRef()} on its ref count. This operation should not throw + * any exceptions. + * @param maxConcurrency The maximum number of ongoing operations at any time. + * @param onItemCompletion Executed when each item is completed, which can be used for instance to report on progress. Must not throw + * exceptions. + * @param onCompletion Executed when all items are completed. + */ + public static void run( + Iterator iterator, + BiConsumer itemConsumer, + int maxConcurrency, + Runnable onItemCompletion, + Runnable onCompletion + ) { + try (var throttledIterator = new ThrottledIterator<>(iterator, itemConsumer, maxConcurrency, onItemCompletion, onCompletion)) { + throttledIterator.run(); + } + } + + private final RefCounted throttleRefs; + private final Iterator iterator; + private final BiConsumer itemConsumer; + private final Semaphore permits; + private final Runnable onItemCompletion; + + private ThrottledIterator( + Iterator iterator, + BiConsumer itemConsumer, + int maxConcurrency, + Runnable onItemCompletion, + Runnable onCompletion + ) { + this.iterator = Objects.requireNonNull(iterator); + this.itemConsumer = Objects.requireNonNull(itemConsumer); + if (maxConcurrency <= 0) { + throw new IllegalArgumentException("maxConcurrency must be positive"); + } + this.permits = new Semaphore(maxConcurrency); + this.onItemCompletion = Objects.requireNonNull(onItemCompletion); + this.throttleRefs = AbstractRefCounted.of(onCompletion); + } + + private void run() { + while (permits.tryAcquire()) { + final T item; + synchronized (iterator) { + if (iterator.hasNext()) { + item = iterator.next(); + } else { + permits.release(); + return; + } + } + try (var itemRefs = new ItemRefCounted()) { + itemConsumer.accept(itemRefs, item); + } catch (Exception e) { + logger.error(Strings.format("exception when processing [%s] with [%s]", item, itemConsumer), e); + assert false : e; + } + } + } + + @Override + public void close() { + throttleRefs.decRef(); + } + + // A RefCounted for a single item, including protection against calling back into run() if it's created and closed within a single + // invocation of run(). + private class ItemRefCounted extends AbstractRefCounted implements Releasable { + private boolean isRecursive = true; + + ItemRefCounted() { + throttleRefs.incRef(); + } + + @Override + protected void closeInternal() { + try { + onItemCompletion.run(); + } catch (Exception e) { + logger.error("exception in onItemCompletion", e); + assert false : e; + } finally { + permits.release(); + try { + // Someone must now pick up the next item. Here we might be called from the run() invocation which started processing + // the just-completed item (via close() -> decRef()) if that item's processing didn't fork or all its forked tasks + // finished first. If so, there's no need to call run() here, we can just return and the next iteration of the run() + // loop will continue the processing; moreover calling run() in this situation could lead to a stack overflow. However + // if we're not within that run() invocation then ... + if (isRecursive() == false) { + // ... we're not within any other run() invocation either, so it's safe (and necessary) to call run() here. + run(); + } + } finally { + throttleRefs.decRef(); + } + } + } + + // Note on blocking: we call both of these synchronized methods exactly once (and must enter close() before calling isRecursive()). + // If close() releases the last ref and calls closeInternal(), and hence isRecursive(), then there's no other threads involved and + // hence no blocking. In contrast if close() doesn't release the last ref then it exits immediately, so the call to isRecursive() + // will proceed without delay in this case too. + + private synchronized boolean isRecursive() { + return isRecursive; + } + + @Override + public synchronized void close() { + decRef(); + isRecursive = false; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java index baf0c9ad08059..7f5c50ceaf421 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.core.Nullable; +import org.elasticsearch.gateway.CorruptStateException; import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentFragment; @@ -318,7 +319,10 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException { } case WRITER_UUID -> { writerUuid = new BytesRef(parser.binaryValue()); - assert writerUuid.length > 0; + assert BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED == false || writerUuid.length > 0; + if (writerUuid.length == 0) { + throw new ElasticsearchParseException("invalid (empty) writer uuid"); + } } default -> XContentParserUtils.throwUnknownField(currentFieldName, parser); } @@ -336,6 +340,11 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException { } else if (checksum == null) { throw new ElasticsearchParseException("missing checksum for name [" + name + "]"); } + try { + org.apache.lucene.util.Version.parse(writtenBy); + } catch (Exception e) { + throw new ElasticsearchParseException("invalid written_by [" + writtenBy + "]"); + } return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash, writerUuid), partSize); } @@ -571,6 +580,13 @@ public static BlobStoreIndexShardSnapshot fromXContent(XContentParser parser) th } } + if (snapshot == null) { + throw new CorruptStateException("snapshot missing"); + } + if (indexVersion < 0) { + throw new CorruptStateException("index version missing or corrupt"); + } + return new BlobStoreIndexShardSnapshot( snapshot, indexVersion, diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java index 113d3c8f28a19..2f022e1c34394 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java @@ -256,6 +256,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + static volatile boolean INTEGRITY_ASSERTIONS_ENABLED = true; + public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) throws IOException { XContentParser.Token token = parser.currentToken(); if (token == null) { // New parser @@ -309,7 +311,11 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t List fileInfosBuilder = new ArrayList<>(); for (String file : entry.v2()) { FileInfo fileInfo = files.get(file); - assert fileInfo != null; + if (fileInfo == null) { + final var exception = new IllegalStateException("shard index inconsistent at file [" + file + "]"); + assert INTEGRITY_ASSERTIONS_ENABLED == false : exception; + throw exception; + } fileInfosBuilder.add(fileInfo); } snapshots.add(new SnapshotFiles(entry.v1(), Collections.unmodifiableList(fileInfosBuilder), historyUUIDs.get(entry.v1()))); diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 58b089940b215..b35f2c5f09536 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -532,7 +532,8 @@ protected Node( searchModule.getNamedWriteables().stream(), pluginsService.flatMap(Plugin::getNamedWriteables), ClusterModule.getNamedWriteables().stream(), - SystemIndexMigrationExecutor.getNamedWriteables().stream() + SystemIndexMigrationExecutor.getNamedWriteables().stream(), + RepositoriesService.getNamedWriteables().stream() ).flatMap(Function.identity()).toList(); final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables); NamedXContentRegistry xContentRegistry = new NamedXContentRegistry( diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 2f28adcebc98e..1ec81bc6d7923 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; +import org.elasticsearch.action.admin.cluster.repositories.integrity.VerifyRepositoryIntegrityAction; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; @@ -35,6 +36,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -45,6 +47,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -130,6 +133,16 @@ public RepositoriesService( this.preRestoreChecks = preRestoreChecks; } + public static List getNamedWriteables() { + return List.of( + new NamedWriteableRegistry.Entry( + Task.Status.class, + VerifyRepositoryIntegrityAction.Status.NAME, + VerifyRepositoryIntegrityAction.Status::new + ) + ); + } + /** * Registers new repository in the cluster *

diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index d818476c68c2c..c50e3a61dab91 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -9,6 +9,8 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.repositories.integrity.VerifyRepositoryIntegrityAction; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -16,6 +18,7 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.LifecycleComponent; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.shard.ShardId; @@ -31,8 +34,10 @@ import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; /** * An interface for interacting with a repository in snapshot and restore. @@ -309,4 +314,15 @@ void cloneShardSnapshot( static boolean assertSnapshotMetaThread() { return ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT_META); } + + default void verifyMetadataIntegrity( + Client client, + Supplier bytesStreamOutputSupplier, + VerifyRepositoryIntegrityAction.Request request, + ActionListener listener, + BooleanSupplier isCancelledSupplier, + Consumer> statusSupplierConsumer + ) { + listener.onFailure(new UnsupportedOperationException("this repository type does not support metadata integrity verification")); + } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index 089b5a6e639ba..93cc618236c6d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -275,6 +275,10 @@ public Collection getSnapshotIds() { return snapshotIds.values(); } + public long getIndexSnapshotCount() { + return indexSnapshots.values().stream().mapToLong(List::size).sum(); + } + /** * @return whether some of the {@link SnapshotDetails} of the given snapshot are missing, due to BwC, so that they must be loaded from * the {@link SnapshotInfo} blob instead. diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 079a21f341e2d..489ed34cd938c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -27,11 +27,13 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.SingleResultDeduplicator; import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.admin.cluster.repositories.integrity.VerifyRepositoryIntegrityAction; import org.elasticsearch.action.support.CountDownActionListener; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.ThreadedActionListener; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.RepositoryCleanupInProgress; @@ -58,6 +60,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.metrics.CounterMetric; @@ -143,6 +146,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -3540,6 +3544,18 @@ private static void failStoreIfCorrupted(Store store, Exception e) { } } + @Override + public void verifyMetadataIntegrity( + Client client, + Supplier bytesStreamOutputSupplier, + VerifyRepositoryIntegrityAction.Request request, + ActionListener listener, + BooleanSupplier isCancelledSupplier, + Consumer> statusSupplierConsumer + ) { + MetadataVerifier.run(this, client, request, isCancelledSupplier, statusSupplierConsumer, listener); + } + public boolean supportURLRepo() { return supportURLRepo; } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/MetadataVerifier.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/MetadataVerifier.java new file mode 100644 index 0000000000000..65ec5e3940234 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/MetadataVerifier.java @@ -0,0 +1,871 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.repositories.blobstore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.admin.cluster.repositories.integrity.VerifyRepositoryIntegrityAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.ListenableActionFuture; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.blobstore.support.BlobMetadata; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.FormatNames; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ThrottledIterator; +import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.RefCounted; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Strings; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; +import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Locale; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.BooleanSupplier; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; + +class MetadataVerifier implements Releasable { + private static final Logger logger = LogManager.getLogger(MetadataVerifier.class); + + enum Anomaly { + FAILED_TO_LOAD_GLOBAL_METADATA, + FAILED_TO_LOAD_SHARD_SNAPSHOT, + FAILED_TO_LOAD_INDEX_METADATA, + FAILED_TO_LOAD_SHARD_GENERATION, + UNDEFINED_SHARD_GENERATION, + UNEXPECTED_EXCEPTION, + FILE_IN_SHARD_GENERATION_NOT_SNAPSHOT, + SNAPSHOT_SHARD_GENERATION_MISMATCH, + FILE_IN_SNAPSHOT_NOT_SHARD_GENERATION, + MISMATCHED_VIRTUAL_BLOB_LENGTH, + MISSING_BLOB, + MISMATCHED_BLOB_LENGTH, + UNKNOWN_SNAPSHOT_FOR_INDEX, + } + + public static void run( + BlobStoreRepository blobStoreRepository, + Client client, + VerifyRepositoryIntegrityAction.Request verifyRequest, + BooleanSupplier isCancelledSupplier, + Consumer> statusSupplierConsumer, + ActionListener listener + ) { + logger.info("[{}] verifying metadata integrity", blobStoreRepository.getMetadata().name()); + blobStoreRepository.getRepositoryData(listener.delegateFailure((l, repositoryData) -> { + try ( + var metadataVerifier = new MetadataVerifier( + blobStoreRepository, + client, + verifyRequest, + repositoryData, + isCancelledSupplier, + new ActionListener<>() { + @Override + public void onResponse(Long anomalyCount) { + logger.info( + "[{}] completed verifying metadata integrity for index generation [{}]: " + + "repo UUID [{}], cluster UUID [{}], anomalies [{}]", + blobStoreRepository.getMetadata().name(), + repositoryData.getGenId(), + repositoryData.getUuid(), + repositoryData.getClusterUUID(), + anomalyCount + ); + l.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + logger.info( + () -> Strings.format( + "[%s] failed verifying metadata integrity for index generation [%d]: repo UUID [%s], cluster UUID [%s]", + blobStoreRepository.getMetadata().name(), + repositoryData.getGenId(), + repositoryData.getUuid(), + repositoryData.getClusterUUID() + ) + ); + l.onFailure(e); + } + } + ) + ) { + statusSupplierConsumer.accept(metadataVerifier::getStatus); + metadataVerifier.start(); + } + })); + } + + private final BlobStoreRepository blobStoreRepository; + private final Client client; + private final ActionListener finalListener; + private final RefCounted finalRefs = AbstractRefCounted.of(this::onCompletion); + private final String repositoryName; + private final VerifyRepositoryIntegrityAction.Request verifyRequest; + private final RepositoryData repositoryData; + private final BooleanSupplier isCancelledSupplier; + private final AtomicLong anomalyCount = new AtomicLong(); + private final Map snapshotDescriptionsById = ConcurrentCollections.newConcurrentMap(); + private final Semaphore threadPoolPermits; + private final Queue executorQueue = ConcurrentCollections.newQueue(); + private final Set requestedIndices; + + private final long snapshotCount; + private final AtomicLong snapshotProgress = new AtomicLong(); + private final long indexCount; + private final AtomicLong indexProgress = new AtomicLong(); + private final long indexSnapshotCount; + private final AtomicLong indexSnapshotProgress = new AtomicLong(); + + MetadataVerifier( + BlobStoreRepository blobStoreRepository, + Client client, + VerifyRepositoryIntegrityAction.Request verifyRequest, + RepositoryData repositoryData, + BooleanSupplier isCancelledSupplier, + ActionListener finalListener + ) { + this.blobStoreRepository = blobStoreRepository; + this.repositoryName = blobStoreRepository.metadata.name(); + this.client = client; + this.verifyRequest = verifyRequest; + this.repositoryData = repositoryData; + this.isCancelledSupplier = isCancelledSupplier; + this.finalListener = finalListener; + this.threadPoolPermits = new Semaphore(Math.max(1, verifyRequest.getThreadPoolConcurrency())); + this.requestedIndices = Set.of(verifyRequest.getIndices()); + + this.snapshotCount = repositoryData.getSnapshotIds().size(); + this.indexCount = repositoryData.getIndices().size(); + this.indexSnapshotCount = repositoryData.getIndexSnapshotCount(); + } + + @Override + public void close() { + finalRefs.decRef(); + } + + private static final String RESULTS_INDEX = "metadata_verification_results"; + + VerifyRepositoryIntegrityAction.Status getStatus() { + return new VerifyRepositoryIntegrityAction.Status( + repositoryName, + repositoryData.getGenId(), + repositoryData.getUuid(), + snapshotCount, + snapshotProgress.get(), + indexCount, + indexProgress.get(), + indexSnapshotCount, + indexSnapshotProgress.get(), + anomalyCount.get() + ); + } + + private void start() { + logger.info( + "[{}] verifying metadata integrity for index generation [{}]: " + + "repo UUID [{}], cluster UUID [{}], snapshots [{}], indices [{}], index snapshots [{}]", + repositoryName, + repositoryData.getGenId(), + repositoryData.getUuid(), + repositoryData.getClusterUUID(), + snapshotCount, + indexCount, + indexSnapshotCount + ); + + // TODO define (strict) mappings for index + client.admin().indices().prepareCreate(RESULTS_INDEX).execute(makeListener(finalRefs, createIndexResponse -> verifySnapshots())); + } + + private void verifySnapshots() { + runThrottled( + repositoryData.getSnapshotIds().iterator(), + this::verifySnapshot, + verifyRequest.getSnapshotVerificationConcurrency(), + snapshotProgress, + wrapRunnable(finalRefs, this::verifyIndices) + ); + } + + private void verifySnapshot(RefCounted snapshotRefs, SnapshotId snapshotId) { + if (isCancelledSupplier.getAsBoolean()) { + // getSnapshotInfo does its own forking so we must check for cancellation here + return; + } + + blobStoreRepository.getSnapshotInfo(snapshotId, makeListener(snapshotRefs, snapshotInfo -> { + final var snapshotDescription = new SnapshotDescription(snapshotId, snapshotInfo.startTime(), snapshotInfo.endTime()); + snapshotDescriptionsById.put(snapshotId.getUUID(), snapshotDescription); + forkSupply(snapshotRefs, () -> getSnapshotGlobalMetadata(snapshotRefs, snapshotDescription), metadata -> { + // no checks here, loading it is enough + }); + })); + } + + private Metadata getSnapshotGlobalMetadata(RefCounted snapshotRefs, SnapshotDescription snapshotDescription) { + try { + return blobStoreRepository.getSnapshotGlobalMetadata(snapshotDescription.snapshotId()); + } catch (Exception e) { + addAnomaly(Anomaly.FAILED_TO_LOAD_GLOBAL_METADATA, snapshotRefs, (builder, params) -> { + snapshotDescription.writeXContent(builder); + ElasticsearchException.generateFailureXContent(builder, params, e, true); + return builder; + }); + return null; + } + } + + private void verifyIndices() { + runThrottled( + repositoryData.getIndices().values().iterator(), + (refCounted, indexId) -> new IndexVerifier(refCounted, indexId).run(), + verifyRequest.getIndexVerificationConcurrency(), + indexProgress, + wrapRunnable(finalRefs, () -> {}) + ); + } + + private record ShardContainerContents( + Map blobsByName, + BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots + ) {} + + private class IndexVerifier { + private final RefCounted indexRefs; + private final IndexId indexId; + private final Map> shardContainerContentsListener = newConcurrentMap(); + private final Map> indexDescriptionListenersByBlobId = newConcurrentMap(); + private final AtomicInteger totalSnapshotCounter = new AtomicInteger(); + private final AtomicInteger restorableSnapshotCounter = new AtomicInteger(); + + IndexVerifier(RefCounted indexRefs, IndexId indexId) { + this.indexRefs = indexRefs; + this.indexId = indexId; + } + + void run() { + if (requestedIndices.isEmpty() == false && requestedIndices.contains(indexId.getName()) == false) { + return; + } + + runThrottled( + repositoryData.getSnapshots(indexId).iterator(), + this::verifyIndexSnapshot, + verifyRequest.getIndexSnapshotVerificationConcurrency(), + indexSnapshotProgress, + wrapRunnable(indexRefs, () -> recordRestorability(totalSnapshotCounter.get(), restorableSnapshotCounter.get())) + ); + } + + private void recordRestorability(int totalSnapshotCount, int restorableSnapshotCount) { + if (isCancelledSupplier.getAsBoolean() == false) { + addResult(indexRefs, (builder, params) -> { + writeIndexId(indexId, builder); + builder.field( + "restorability", + totalSnapshotCount == restorableSnapshotCount ? "full" : 0 < restorableSnapshotCount ? "partial" : "none" + ); + builder.field("snapshots", totalSnapshotCount); + builder.field("restorable_snapshots", restorableSnapshotCount); + builder.field("unrestorable_snapshots", totalSnapshotCount - restorableSnapshotCount); + return builder; + }); + } + } + + private void verifyIndexSnapshot(RefCounted indexSnapshotRefs, SnapshotId snapshotId) { + totalSnapshotCounter.incrementAndGet(); + + final var snapshotDescription = snapshotDescriptionsById.get(snapshotId.getUUID()); + if (snapshotDescription == null) { + addAnomaly(Anomaly.UNKNOWN_SNAPSHOT_FOR_INDEX, indexSnapshotRefs, (builder, params) -> { + writeIndexId(indexId, builder); + new SnapshotDescription(snapshotId, 0, 0).writeXContent(builder); + return builder; + }); + return; + } + + final var indexMetaBlobId = repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, indexId); + indexDescriptionListenersByBlobId.computeIfAbsent(indexMetaBlobId, ignored -> { + final var indexDescriptionFuture = new ListenableActionFuture(); + forkSupply(() -> { + final var shardCount = getNumberOfShards(indexMetaBlobId, snapshotId); + final var indexDescription = new IndexDescription(indexId, indexMetaBlobId, shardCount); + for (int i = 0; i < shardCount; i++) { + shardContainerContentsListener.computeIfAbsent(i, shardId -> { + final var shardContainerContentsFuture = new ListenableActionFuture(); + forkSupply( + () -> new ShardContainerContents( + blobStoreRepository.shardContainer(indexId, shardId).listBlobs(), + getBlobStoreIndexShardSnapshots(indexDescription, shardId) + ), + shardContainerContentsFuture + ); + return shardContainerContentsFuture; + }); + } + return indexDescription; + }, indexDescriptionFuture); + return indexDescriptionFuture; + }).addListener(makeListener(indexSnapshotRefs, indexDescription -> { + final var restorableShardCount = new AtomicInteger(); + final var shardSnapshotsRefs = AbstractRefCounted.of(wrapRunnable(indexSnapshotRefs, () -> { + if (indexDescription.shardCount() > 0 && indexDescription.shardCount() == restorableShardCount.get()) { + restorableSnapshotCounter.incrementAndGet(); + } + })); + try { + for (int i = 0; i < indexDescription.shardCount(); i++) { + final var shardId = i; + shardContainerContentsListener.get(i) + .addListener( + makeListener( + shardSnapshotsRefs, + shardContainerContents -> forkSupply( + shardSnapshotsRefs, + () -> getBlobStoreIndexShardSnapshot( + shardSnapshotsRefs, + snapshotDescription, + indexDescription, + shardId + ), + shardSnapshot -> verifyShardSnapshot( + shardSnapshotsRefs, + indexDescription, + snapshotDescription, + shardId, + shardContainerContents, + shardSnapshot, + restorableShardCount::incrementAndGet + ) + ) + ) + ); + } + } finally { + shardSnapshotsRefs.decRef(); + } + })); + } + + private BlobStoreIndexShardSnapshot getBlobStoreIndexShardSnapshot( + RefCounted shardSnapshotRefs, + SnapshotDescription snapshotDescription, + IndexDescription indexDescription, + int shardId + ) { + try { + return blobStoreRepository.loadShardSnapshot( + blobStoreRepository.shardContainer(indexId, shardId), + snapshotDescription.snapshotId() + ); + } catch (Exception e) { + addAnomaly(Anomaly.FAILED_TO_LOAD_SHARD_SNAPSHOT, shardSnapshotRefs, (builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + ElasticsearchException.generateFailureXContent(builder, params, e, true); + return builder; + }); + return null; + } + } + + private int getNumberOfShards(String indexMetaBlobId, SnapshotId snapshotId) { + try { + return blobStoreRepository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId).getNumberOfShards(); + } catch (Exception e) { + addAnomaly(Anomaly.FAILED_TO_LOAD_INDEX_METADATA, indexRefs, (builder, params) -> { + writeIndexId(indexId, builder); + builder.field("metadata_blob", indexMetaBlobId); + ElasticsearchException.generateFailureXContent(builder, params, e, true); + return builder; + }); + return 0; + } + } + + private BlobStoreIndexShardSnapshots getBlobStoreIndexShardSnapshots(IndexDescription indexDescription, int shardId) { + final var shardGen = repositoryData.shardGenerations().getShardGen(indexId, shardId); + if (shardGen == null) { + addAnomaly(Anomaly.UNDEFINED_SHARD_GENERATION, indexRefs, (builder, params) -> { + indexDescription.writeXContent(builder); + return builder.field("shard", shardId); + }); + return null; + } + try { + return blobStoreRepository.getBlobStoreIndexShardSnapshots(indexId, shardId, shardGen); + } catch (Exception e) { + addAnomaly(Anomaly.FAILED_TO_LOAD_SHARD_GENERATION, indexRefs, (builder, params) -> { + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + ElasticsearchException.generateFailureXContent(builder, params, e, true); + return builder; + }); + return null; + } + } + + private void verifyShardSnapshot( + RefCounted shardSnapshotRefs, + IndexDescription indexDescription, + SnapshotDescription snapshotDescription, + int shardId, + ShardContainerContents shardContainerContents, + BlobStoreIndexShardSnapshot shardSnapshot, + Runnable runIfRestorable + ) { + if (shardSnapshot == null) { + return; + } + + var restorable = true; + for (final var fileInfo : shardSnapshot.indexFiles()) { + restorable &= verifyFileInfo( + shardSnapshotRefs, + snapshotDescription, + indexDescription, + shardId, + shardContainerContents.blobsByName(), + fileInfo + ); + } + if (restorable) { + runIfRestorable.run(); + } + + final var blobStoreIndexShardSnapshots = shardContainerContents.blobStoreIndexShardSnapshots(); + if (blobStoreIndexShardSnapshots != null) { + boolean foundSnapshot = false; + for (SnapshotFiles summary : blobStoreIndexShardSnapshots.snapshots()) { + if (summary.snapshot().equals(snapshotDescription.snapshotId().getName())) { + foundSnapshot = true; + verifyConsistentShardFiles( + shardSnapshotRefs, + snapshotDescription, + indexDescription, + shardId, + shardSnapshot, + summary + ); + break; + } + } + + if (foundSnapshot == false) { + addResult(shardSnapshotRefs, ((builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + builder.field("failure", "missing in shard-level summary"); + return builder; + })); + } + } + } + + private void verifyConsistentShardFiles( + RefCounted shardSnapshotRefs, + SnapshotDescription snapshotDescription, + IndexDescription indexDescription, + int shardId, + BlobStoreIndexShardSnapshot shardSnapshot, + SnapshotFiles summary + ) { + final var snapshotFiles = shardSnapshot.indexFiles() + .stream() + .collect(Collectors.toMap(BlobStoreIndexShardSnapshot.FileInfo::physicalName, Function.identity())); + + for (final var summaryFile : summary.indexFiles()) { + final var snapshotFile = snapshotFiles.get(summaryFile.physicalName()); + if (snapshotFile == null) { + addAnomaly(Anomaly.FILE_IN_SHARD_GENERATION_NOT_SNAPSHOT, shardSnapshotRefs, (builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + builder.field("file_name", summaryFile.physicalName()); + return builder; + }); + } else if (summaryFile.isSame(snapshotFile) == false) { + addAnomaly(Anomaly.SNAPSHOT_SHARD_GENERATION_MISMATCH, shardSnapshotRefs, (builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + builder.field("file_name", summaryFile.physicalName()); + return builder; + }); + } + } + + final var summaryFiles = summary.indexFiles() + .stream() + .collect(Collectors.toMap(BlobStoreIndexShardSnapshot.FileInfo::physicalName, Function.identity())); + for (final var snapshotFile : shardSnapshot.indexFiles()) { + if (summaryFiles.get(snapshotFile.physicalName()) == null) { + addAnomaly(Anomaly.FILE_IN_SNAPSHOT_NOT_SHARD_GENERATION, shardSnapshotRefs, (builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + builder.field("file_name", snapshotFile.physicalName()); + return builder; + }); + } + } + } + + private boolean verifyFileInfo( + RefCounted shardSnapshotRefs, + SnapshotDescription snapshotDescription, + IndexDescription indexDescription, + int shardId, + Map shardBlobs, + BlobStoreIndexShardSnapshot.FileInfo fileInfo + ) { + final var fileLength = ByteSizeValue.ofBytes(fileInfo.length()); + if (fileInfo.metadata().hashEqualsContents()) { + final var actualLength = ByteSizeValue.ofBytes(fileInfo.metadata().hash().length); + if (fileLength.getBytes() != actualLength.getBytes()) { + addAnomaly(Anomaly.MISMATCHED_VIRTUAL_BLOB_LENGTH, shardSnapshotRefs, ((builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + builder.field("file_name", fileInfo.physicalName()); + builder.humanReadableField("actual_length_in_bytes", "actual_length", actualLength); + builder.humanReadableField("expected_length_in_bytes", "expected_length", fileLength); + return builder; + })); + return false; + } + } else { + for (int part = 0; part < fileInfo.numberOfParts(); part++) { + final var finalPart = part; + final var blobName = fileInfo.partName(part); + final var blobInfo = shardBlobs.get(blobName); + final var partLength = ByteSizeValue.ofBytes(fileInfo.partBytes(part)); + if (blobInfo == null) { + addAnomaly(Anomaly.MISSING_BLOB, shardSnapshotRefs, ((builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + builder.field("blob_name", blobName); + builder.field("file_name", fileInfo.physicalName()); + builder.field("part", finalPart); + builder.field("number_of_parts", fileInfo.numberOfParts()); + builder.humanReadableField("file_length_in_bytes", "file_length", fileLength); + builder.humanReadableField("part_length_in_bytes", "part_length", partLength); + return builder; + })); + return false; + } else if (blobInfo.length() != partLength.getBytes()) { + addAnomaly(Anomaly.MISMATCHED_BLOB_LENGTH, shardSnapshotRefs, ((builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + builder.field("blob_name", blobName); + builder.field("file_name", fileInfo.physicalName()); + builder.field("part", finalPart); + builder.field("number_of_parts", fileInfo.numberOfParts()); + builder.humanReadableField("file_length_in_bytes", "file_length", fileLength); + builder.humanReadableField("part_length_in_bytes", "part_length", partLength); + builder.humanReadableField("actual_length_in_bytes", "actual_length", ByteSizeValue.ofBytes(blobInfo.length())); + return builder; + })); + return false; + } + } + } + return true; + } + } + + private ActionListener makeListener(RefCounted refCounted, CheckedConsumer consumer) { + refCounted.incRef(); + return ActionListener.runAfter( + ActionListener.wrap(consumer, exception -> addExceptionResult(refCounted, exception)), + refCounted::decRef + ); + } + + private void addExceptionResult(RefCounted refCounted, Exception exception) { + if (isCancelledSupplier.getAsBoolean() && exception instanceof TaskCancelledException) { + return; + } + addAnomaly(Anomaly.UNEXPECTED_EXCEPTION, refCounted, (builder, params) -> { + ElasticsearchException.generateFailureXContent(builder, params, exception, true); + return builder; + }); + } + + private Runnable wrapRunnable(RefCounted refCounted, Runnable runnable) { + refCounted.incRef(); + return () -> { + try { + runnable.run(); + } finally { + refCounted.decRef(); + } + }; + } + + private void forkSupply(CheckedSupplier supplier, ActionListener listener) { + fork(ActionRunnable.supply(listener, supplier)); + } + + private void fork(AbstractRunnable runnable) { + executorQueue.add(runnable); + tryProcessQueue(); + } + + private void tryProcessQueue() { + while (threadPoolPermits.tryAcquire()) { + final var runnable = executorQueue.poll(); + if (runnable == null) { + threadPoolPermits.release(); + return; + } + + if (isCancelledSupplier.getAsBoolean()) { + try { + runnable.onFailure(new TaskCancelledException("task cancelled")); + continue; + } finally { + threadPoolPermits.release(); + } + } + + blobStoreRepository.threadPool().executor(ThreadPool.Names.SNAPSHOT_META).execute(new AbstractRunnable() { + @Override + public void onRejection(Exception e) { + try { + runnable.onRejection(e); + } finally { + threadPoolPermits.release(); + // no need to call tryProcessQueue() again here, we're still running it + } + } + + @Override + public void onFailure(Exception e) { + try { + runnable.onFailure(e); + } finally { + onCompletion(); + } + } + + @Override + protected void doRun() { + runnable.run(); + onCompletion(); + } + + @Override + public String toString() { + return runnable.toString(); + } + + private void onCompletion() { + threadPoolPermits.release(); + tryProcessQueue(); + } + }); + } + } + + private void forkSupply(RefCounted refCounted, CheckedSupplier supplier, CheckedConsumer consumer) { + forkSupply(supplier, makeListener(refCounted, consumer)); + } + + private void onCompletion() { + final var completionRefs = AbstractRefCounted.of( + () -> client.admin() + .indices() + .prepareFlush(RESULTS_INDEX) + .execute( + finalListener.delegateFailure( + (l1, ignored1) -> client.admin() + .indices() + .prepareRefresh(RESULTS_INDEX) + .execute(l1.delegateFailure((l2, ignored2) -> l2.onResponse(anomalyCount.get()))) + ) + ) + ); + try { + blobStoreRepository.getRepositoryData(makeListener(completionRefs, finalRepositoryData -> { + final var finalRepositoryGeneration = finalRepositoryData.getGenId(); + addResult(completionRefs, (builder, params) -> { + builder.field("completed", true); + builder.field("cancelled", isCancelledSupplier.getAsBoolean()); + builder.field("final_repository_generation", finalRepositoryGeneration); + builder.field("total_anomalies", anomalyCount.get()); + return builder; + }); + })); + } finally { + completionRefs.decRef(); + } + } + + private static void runThrottled( + Iterator iterator, + BiConsumer itemConsumer, + int maxConcurrency, + AtomicLong progressCounter, + Runnable onCompletion + ) { + ThrottledIterator.run(iterator, itemConsumer, maxConcurrency, progressCounter::incrementAndGet, onCompletion); + } + + private final Queue> pendingResults = ConcurrentCollections.newQueue(); + private final Semaphore resultsIndexingSemaphore = new Semaphore(1); + + private void indexResultDoc(IndexRequest indexRequest, Runnable onCompletion) { + if (indexRequest == null) { + onCompletion.run(); + return; + } + pendingResults.add(Tuple.tuple(indexRequest, onCompletion)); + processPendingResults(); + } + + private void processPendingResults() { + while (resultsIndexingSemaphore.tryAcquire()) { + final var bulkRequest = new BulkRequest(); + final var completionActions = new ArrayList(); + + Tuple nextItem; + while ((nextItem = pendingResults.poll()) != null) { + bulkRequest.add(nextItem.v1()); + completionActions.add(nextItem.v2()); + } + + if (completionActions.isEmpty()) { + resultsIndexingSemaphore.release(); + return; + } + + final var isRecursing = new AtomicBoolean(true); + client.bulk(bulkRequest, ActionListener.wrap(() -> { + resultsIndexingSemaphore.release(); + for (final var completionAction : completionActions) { + completionAction.run(); + } + if (isRecursing.get() == false) { + processPendingResults(); + } + }).delegateResponse((l, e) -> { + logger.error("error indexing results", e); + l.onFailure(e); + })); + isRecursing.set(false); + } + } + + private static final DateFormatter dateFormatter = DateFormatter.forPattern(FormatNames.ISO8601.getName()).withLocale(Locale.ROOT); + + private IndexRequest buildResultDoc(ToXContent toXContent) { + try (var builder = XContentFactory.jsonBuilder()) { + builder.startObject(); + builder.field( + "@timestamp", + dateFormatter.format(Instant.ofEpochMilli(blobStoreRepository.threadPool().absoluteTimeInMillis())) + ); + builder.field("repository", repositoryName); + builder.field("uuid", repositoryData.getUuid()); + builder.field("repository_generation", repositoryData.getGenId()); + toXContent.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + return new IndexRequestBuilder(client, IndexAction.INSTANCE, RESULTS_INDEX).setSource(builder).request(); + } catch (Exception e) { + logger.error("error generating failure output", e); + return null; + } + } + + private void addAnomaly(Anomaly anomaly, RefCounted refCounted, ToXContent toXContent) { + anomalyCount.incrementAndGet(); + addResult(refCounted, (builder, params) -> toXContent.toXContent(builder.field("anomaly", anomaly.toString()), params)); + } + + private void addResult(RefCounted refCounted, ToXContent toXContent) { + refCounted.incRef(); + indexResultDoc(buildResultDoc(toXContent), refCounted::decRef); + } + + private record SnapshotDescription(SnapshotId snapshotId, long startTimeMillis, long endTimeMillis) { + void writeXContent(XContentBuilder builder) throws IOException { + builder.startObject("snapshot"); + builder.field("id", snapshotId.getUUID()); + builder.field("name", snapshotId.getName()); + builder.field("start_time_millis", startTimeMillis); + builder.field("end_time_millis", startTimeMillis); + builder.endObject(); + } + } + + private record IndexDescription(IndexId indexId, String indexMetadataBlob, int shardCount) { + void writeXContent(XContentBuilder builder) throws IOException { + builder.startObject("index"); + writeIndexId(indexId, builder); + builder.field("metadata_blob", indexMetadataBlob); + builder.field("shards", shardCount); + builder.endObject(); + } + } + + private static void writeIndexId(IndexId indexId, XContentBuilder builder) throws IOException { + builder.field("id", indexId.getId()); + builder.field("name", indexId.getName()); + } + +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestVerifyRepositoryIntegrityAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestVerifyRepositoryIntegrityAction.java new file mode 100644 index 0000000000000..6f6696b6655f0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestVerifyRepositoryIntegrityAction.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.rest.action.admin.cluster; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.repositories.integrity.VerifyRepositoryIntegrityAction; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestVerifyRepositoryIntegrityAction extends BaseRestHandler { + @Override + public List routes() { + return List.of(new Route(POST, "/_snapshot/{repository}/_verify_integrity")); + } + + @Override + public String getName() { + return "verify_repository_integrity_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + final var verifyRequest = new VerifyRepositoryIntegrityAction.Request( + request.param("repository"), + request.paramAsStringArray("indices", Strings.EMPTY_ARRAY), + request.paramAsInt("thread_pool_concurrency", 0), + request.paramAsInt("snapshot_verification_concurrency", 0), + request.paramAsInt("index_verification_concurrency", 0), + request.paramAsInt("index_snapshot_verification_concurrency", 0) + ); + verifyRequest.masterNodeTimeout(request.paramAsTime("master_timeout", verifyRequest.masterNodeTimeout())); + return channel -> { + final var task = client.executeLocally(VerifyRepositoryIntegrityAction.INSTANCE, verifyRequest, ActionListener.noop()); + try (var builder = channel.newBuilder()) { + builder.startObject(); + builder.field("task", new TaskId(client.getLocalNodeId(), task.getId()).toString()); + builder.endObject(); + channel.sendResponse(new RestResponse(RestStatus.OK, builder)); + } + }; + } +} diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ThrottledIteratorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ThrottledIteratorTests.java new file mode 100644 index 0000000000000..e4d8f73af4bbb --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ThrottledIteratorTests.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.apache.lucene.tests.util.LuceneTestCase; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.FixedExecutorBuilder; +import org.elasticsearch.threadpool.ScalingExecutorBuilder; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import java.util.stream.IntStream; + +public class ThrottledIteratorTests extends ESTestCase { + private static final String CONSTRAINED = "constrained"; + private static final String RELAXED = "relaxed"; + + public void testConcurrency() throws InterruptedException { + final var maxConstrainedThreads = between(1, 3); + final var maxRelaxedThreads = between(1, 100); + final var constrainedQueue = between(3, 6); + final var threadPool = new TestThreadPool( + "test", + new FixedExecutorBuilder(Settings.EMPTY, CONSTRAINED, maxConstrainedThreads, constrainedQueue, CONSTRAINED, false), + new ScalingExecutorBuilder(RELAXED, 1, maxRelaxedThreads, TimeValue.timeValueSeconds(30), true) + ); + try { + final var items = between(1, 10000); // large enough that inadvertent recursion will trigger a StackOverflowError + final var itemStartLatch = new CountDownLatch(items); + final var completedItems = new AtomicInteger(); + final var maxConcurrency = between(1, (constrainedQueue + maxConstrainedThreads) * 2); + final var itemPermits = new Semaphore(maxConcurrency); + final var completionLatch = new CountDownLatch(1); + final BooleanSupplier forkSupplier = randomFrom( + () -> false, + ESTestCase::randomBoolean, + LuceneTestCase::rarely, + LuceneTestCase::usually, + () -> true + ); + final var blockPermits = new Semaphore(between(0, Math.min(maxRelaxedThreads, maxConcurrency) - 1)); + + ThrottledIterator.run(IntStream.range(0, items).boxed().iterator(), (refs, item) -> { + assertTrue(itemPermits.tryAcquire()); + if (forkSupplier.getAsBoolean()) { + refs.incRef(); + final var executor = randomFrom(CONSTRAINED, RELAXED); + threadPool.executor(executor).execute(new AbstractRunnable() { + + @Override + public void onRejection(Exception e) { + assertEquals(CONSTRAINED, executor); + itemStartLatch.countDown(); + } + + @Override + protected void doRun() { + itemStartLatch.countDown(); + if (RELAXED.equals(executor) && randomBoolean() && blockPermits.tryAcquire()) { + // simulate at most (maxConcurrency-1) long-running operations, to demonstrate that they don't + // hold up the processing of the other operations + try { + assertTrue(itemStartLatch.await(30, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new AssertionError("unexpected", e); + } finally { + blockPermits.release(); + } + } + } + + @Override + public void onAfter() { + itemPermits.release(); + refs.decRef(); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("unexpected", e); + } + }); + } else { + itemStartLatch.countDown(); + itemPermits.release(); + } + }, maxConcurrency, completedItems::incrementAndGet, completionLatch::countDown); + + assertTrue(completionLatch.await(30, TimeUnit.SECONDS)); + assertEquals(items, completedItems.get()); + assertTrue(itemPermits.tryAcquire(maxConcurrency)); + assertTrue(itemStartLatch.await(0, TimeUnit.SECONDS)); + } finally { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshotsIntegritySuppressor.java b/test/framework/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshotsIntegritySuppressor.java new file mode 100644 index 0000000000000..d22db399a2f34 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshotsIntegritySuppressor.java @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.index.snapshots.blobstore; + +import org.elasticsearch.core.Releasable; + +public class BlobStoreIndexShardSnapshotsIntegritySuppressor implements Releasable { + + public BlobStoreIndexShardSnapshotsIntegritySuppressor() { + BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED = false; + } + + @Override + public void close() { + BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED = true; + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 65037651cf4cb..19c42a77337db 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -53,6 +53,7 @@ public class Constants { "cluster:admin/repository/get", "cluster:admin/repository/put", "cluster:admin/repository/verify", + "cluster:admin/repository/verify_integrity", "cluster:admin/reroute", "cluster:admin/script/delete", "cluster:admin/script/get",