Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve reaction to blob store corruptions #111954

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions docs/reference/release-notes/8.15.0.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ after it is killed up to four times in 24 hours. (issue: {es-issue}110530[#11053
* Pipeline aggregations under `time_series` and `categorize_text` aggregations are never
returned (issue: {es-issue}111679[#111679])

* Elasticsearch will not start on Windows machines when the recommended [bootstrap.memory_lock: true](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration-memory.html#bootstrap-memory_lock) setting is configured due to [native access refactoring](https://github.com/elastic/elasticsearch/pull/111866). The workaround for 8.15.0 is to downgrade to the previous version. This issue will be fixed in 8.15.1.

[[breaking-8.15.0]]
[float]
=== Breaking changes
Expand All @@ -32,11 +34,6 @@ Rollup::
Search::
* Change `skip_unavailable` remote cluster setting default value to true {es-pull}105792[#105792]

[[known-issues-8.15.0]]
[float]
=== Known issues
* Elasticsearch will not start on Windows machines when the recommended [bootstrap.memory_lock: true](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration-memory.html#bootstrap-memory_lock) setting is configured due to [native access refactoring](https://github.com/elastic/elasticsearch/pull/111866). The workaround for 8.15.0 is to downgrade to the previous version. This issue will be fixed in 8.15.1.

[[bug-8.15.0]]
[float]
=== Bug fixes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.repositories.blobstore;

import org.apache.lucene.tests.mockfile.ExtrasFS;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshotsIntegritySuppressor;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;

public class BlobStoreCorruptionIT extends AbstractSnapshotIntegTestCase {

private static final Logger logger = LogManager.getLogger(BlobStoreCorruptionIT.class);

@Before
public void suppressConsistencyCheck() {
disableRepoConsistencyCheck("testing corruption detection involves breaking the repo");
}

public void testCorruptionDetection() throws Exception {
final var repositoryName = randomIdentifier();
final var indexName = randomIdentifier();
final var snapshotName = randomIdentifier();
final var repositoryRootPath = randomRepoPath();

createRepository(repositoryName, FsRepository.TYPE, repositoryRootPath);
createIndexWithRandomDocs(indexName, between(1, 100));
flushAndRefresh(indexName);
createSnapshot(repositoryName, snapshotName, List.of(indexName));

final var corruptedFile = corruptRandomFile(repositoryRootPath);
final var corruptedFileType = RepositoryFileType.getRepositoryFileType(repositoryRootPath, corruptedFile);
final var corruptionDetectors = new ArrayList<CheckedConsumer<ActionListener<Exception>, ?>>();

// detect corruption by listing the snapshots
if (corruptedFileType == RepositoryFileType.SNAPSHOT_INFO) {
corruptionDetectors.add(exceptionListener -> {
logger.info("--> listing snapshots");
client().admin()
.cluster()
.prepareGetSnapshots(TEST_REQUEST_TIMEOUT, repositoryName)
.execute(ActionTestUtils.assertNoSuccessListener(exceptionListener::onResponse));
});
}

// detect corruption by taking another snapshot
if (corruptedFileType == RepositoryFileType.SHARD_GENERATION) {
corruptionDetectors.add(exceptionListener -> {
logger.info("--> taking another snapshot");
client().admin()
.cluster()
.prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier())
.setWaitForCompletion(true)
.execute(exceptionListener.map(createSnapshotResponse -> {
assertNotEquals(SnapshotState.SUCCESS, createSnapshotResponse.getSnapshotInfo().state());
return new ElasticsearchException("create-snapshot failed as expected");
}));
});
}

// detect corruption by restoring the snapshot
switch (corruptedFileType) {
case SNAPSHOT_INFO, GLOBAL_METADATA, INDEX_METADATA -> corruptionDetectors.add(exceptionListener -> {
logger.info("--> restoring snapshot");
client().admin()
.cluster()
.prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName)
.setRestoreGlobalState(corruptedFileType == RepositoryFileType.GLOBAL_METADATA || randomBoolean())
.setWaitForCompletion(true)
.execute(ActionTestUtils.assertNoSuccessListener(exceptionListener::onResponse));
});
case SHARD_SNAPSHOT_INFO, SHARD_DATA -> corruptionDetectors.add(exceptionListener -> {
logger.info("--> restoring snapshot and checking for failed shards");
SubscribableListener
// if shard-level data is corrupted then the overall restore succeeds but the shard recoveries fail
.<AcknowledgedResponse>newForked(l -> client().admin().indices().prepareDelete(indexName).execute(l))
.andThenAccept(ElasticsearchAssertions::assertAcked)

.<RestoreSnapshotResponse>andThen(
l -> client().admin()
.cluster()
.prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName)
.setRestoreGlobalState(randomBoolean())
.setWaitForCompletion(true)
.execute(l)
)

.addListener(exceptionListener.map(restoreSnapshotResponse -> {
assertNotEquals(0, restoreSnapshotResponse.getRestoreInfo().failedShards());
return new ElasticsearchException("post-restore recoveries failed as expected");
}));
});
}

try (var ignored = new BlobStoreIndexShardSnapshotsIntegritySuppressor()) {
final var exception = safeAwait(randomFrom(corruptionDetectors));
logger.info(Strings.format("--> corrupted [%s] and caught exception", corruptedFile), exception);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this potentially be considered an example of use randomised testing for coverage?

It seems we've enumerated the ways in which we expect to detect each type of corruption, then we're randomly picking one to execute. Would it not be better to test them all? or does the execution of one have side-effects precluding the others running?

More a question than a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ehh kinda, tho the coverage is extremely low even if we did check all these paths on each test run. The assertions we were previously hitting would only trip if you corrupted a very specific byte in exactly the right way.

Really this is just to verify that a corruption doesn't trip assertions so that we can proceed with #93735 that will introduce a way to catch all these problems at once.

}
}

private static Path corruptRandomFile(Path repositoryRootPath) throws IOException {
final var corruptedFileType = getRandomCorruptibleFileType();
final var corruptedFile = getRandomFileToCorrupt(repositoryRootPath, corruptedFileType);
if (randomBoolean()) {
logger.info("--> deleting [{}]", corruptedFile);
Files.delete(corruptedFile);
} else {
corruptFileContents(corruptedFile);
}
return corruptedFile;
}

private static void corruptFileContents(Path fileToCorrupt) throws IOException {
final var oldFileContents = Files.readAllBytes(fileToCorrupt);
logger.info("--> contents of [{}] before corruption: [{}]", fileToCorrupt, Base64.getEncoder().encodeToString(oldFileContents));
final byte[] newFileContents = new byte[randomBoolean() ? oldFileContents.length : between(0, oldFileContents.length)];
System.arraycopy(oldFileContents, 0, newFileContents, 0, newFileContents.length);
if (newFileContents.length == oldFileContents.length) {
final var corruptionPosition = between(0, newFileContents.length - 1);
newFileContents[corruptionPosition] = randomValueOtherThan(oldFileContents[corruptionPosition], ESTestCase::randomByte);
logger.info(
"--> updating byte at position [{}] from [{}] to [{}]",
corruptionPosition,
oldFileContents[corruptionPosition],
newFileContents[corruptionPosition]
);
} else {
logger.info("--> truncating file from length [{}] to length [{}]", oldFileContents.length, newFileContents.length);
}
Files.write(fileToCorrupt, newFileContents);
logger.info("--> contents of [{}] after corruption: [{}]", fileToCorrupt, Base64.getEncoder().encodeToString(newFileContents));
}

private static RepositoryFileType getRandomCorruptibleFileType() {
return randomValueOtherThanMany(
// these blob types do not have reliable corruption detection, so we must skip them
t -> t == RepositoryFileType.ROOT_INDEX_N || t == RepositoryFileType.ROOT_INDEX_LATEST,
() -> randomFrom(RepositoryFileType.values())
);
}

private static Path getRandomFileToCorrupt(Path repositoryRootPath, RepositoryFileType corruptedFileType) throws IOException {
final var corruptibleFiles = new ArrayList<Path>();
Files.walkFileTree(repositoryRootPath, new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path filePath, BasicFileAttributes attrs) throws IOException {
if (ExtrasFS.isExtra(filePath.getFileName().toString()) == false
&& RepositoryFileType.getRepositoryFileType(repositoryRootPath, filePath) == corruptedFileType) {
corruptibleFiles.add(filePath);
}
return super.visitFile(filePath, attrs);
}
});
return randomFrom(corruptibleFiles);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.gateway.CorruptStateException;
import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentFragment;
Expand Down Expand Up @@ -318,7 +319,11 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
}
case WRITER_UUID -> {
writerUuid = new BytesRef(parser.binaryValue());
assert writerUuid.length > 0;
assert BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED == false || writerUuid.length > 0;
if (writerUuid.length == 0) {
// we never write UNAVAILABLE_WRITER_UUID, so this must be due to corruption
throw new ElasticsearchParseException("invalid (empty) writer uuid");
}
}
default -> XContentParserUtils.throwUnknownField(currentFieldName, parser);
}
Expand All @@ -336,6 +341,12 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
} else if (checksum == null) {
throw new ElasticsearchParseException("missing checksum for name [" + name + "]");
}
try {
// check for corruption before asserting writtenBy is parseable in the StoreFileMetadata constructor
org.apache.lucene.util.Version.parse(writtenBy);
} catch (Exception e) {
throw new ElasticsearchParseException("invalid written_by [" + writtenBy + "]");
}
return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash, writerUuid), partSize);
}

Expand Down Expand Up @@ -566,6 +577,11 @@ public static BlobStoreIndexShardSnapshot fromXContent(XContentParser parser) th
}
}

// check for corruption before asserting snapshot != null in the BlobStoreIndexShardSnapshot ctor
if (snapshot == null) {
throw new CorruptStateException("snapshot missing");
}

return new BlobStoreIndexShardSnapshot(
snapshot,
indexFiles == null ? List.of() : indexFiles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

static volatile boolean INTEGRITY_ASSERTIONS_ENABLED = true;

public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token == null) { // New parser
Expand Down Expand Up @@ -317,7 +319,12 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
List<FileInfo> fileInfosBuilder = new ArrayList<>();
for (String file : entry.v2()) {
FileInfo fileInfo = files.get(file);
assert fileInfo != null;
if (fileInfo == null) {
// could happen in production if the repo contents are corrupted
final var exception = new IllegalStateException("shard index inconsistent at file [" + file + "]");
assert INTEGRITY_ASSERTIONS_ENABLED == false : exception;
throw exception;
}
fileInfosBuilder.add(fileInfo);
}
snapshots.add(new SnapshotFiles(entry.v1(), Collections.unmodifiableList(fileInfosBuilder), historyUUIDs.get(entry.v1())));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.snapshots.blobstore;

import org.elasticsearch.core.Releasable;

/**
* Test utility class to suppress assertions about the integrity of the contents of a blobstore repository, in order to verify the
* production behaviour on encountering invalid data.
*/
public class BlobStoreIndexShardSnapshotsIntegritySuppressor implements Releasable {

public BlobStoreIndexShardSnapshotsIntegritySuppressor() {
BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED = false;
}

@Override
public void close() {
BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED = true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.repositories.blobstore;

import org.elasticsearch.common.Strings;

import java.nio.file.Path;
import java.util.regex.Pattern;

/**
* The types of blobs in a {@link BlobStoreRepository}.
*/
public enum RepositoryFileType {

ROOT_INDEX_N("index-NUM"),
ROOT_INDEX_LATEST("index.latest"),
SNAPSHOT_INFO("snap-UUID.dat"),
GLOBAL_METADATA("meta-UUID.dat"),
INDEX_METADATA("indices/UUID/meta-SHORTUUID.dat"),
SHARD_GENERATION("indices/UUID/NUM/index-UUID"),
SHARD_SNAPSHOT_INFO("indices/UUID/NUM/snap-UUID.dat"),
SHARD_DATA("indices/UUID/NUM/__UUID"),
// NB no support for legacy names (yet)
;

private final Pattern pattern;

RepositoryFileType(String regex) {
pattern = Pattern.compile(
"^("
+ regex
// decimal numbers
.replace("NUM", "(0|[1-9][0-9]*)")
// 15-byte UUIDS from TimeBasedUUIDGenerator
.replace("SHORTUUID", "[0-9a-zA-Z_-]{20}")
// 16-byte UUIDs from RandomBasedUUIDGenerator
.replace("UUID", "[0-9a-zA-Z_-]{22}")
+ ")$"
);
}

public static RepositoryFileType getRepositoryFileType(Path repositoryRoot, Path blobPath) {
final var relativePath = repositoryRoot.relativize(blobPath).toString().replace(repositoryRoot.getFileSystem().getSeparator(), "/");
for (final var repositoryFileType : RepositoryFileType.values()) {
if (repositoryFileType.pattern.matcher(relativePath).matches()) {
return repositoryFileType;
}
}
throw new IllegalArgumentException(
Strings.format("[%s] is not the path of a known blob type within [%s]", relativePath, repositoryRoot)
);
}

}