Skip to content

Commit

Permalink
Avoid atomic write of large blobs in repo analyzer (elastic#69960)
Browse files Browse the repository at this point in the history
Today we randomly perform an atomic write even if there are no early
reads, but we should only do so if the blob is small enough to write
atomically.
  • Loading branch information
DaveCTurner committed Mar 4, 2021
1 parent 33bd1a2 commit 02cc01d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ setup:
readonly: true
location: "test_repo_loc"

- do:
snapshot.create_repository:
repository: test_repo_slow
body:
type: fs
settings:
max_snapshot_bytes_per_sec: "1b"
location: "test_repo_loc"

---
"Analysis fails on readonly repositories":
- skip:
Expand Down Expand Up @@ -145,3 +154,26 @@ setup:
- is_false: summary.read.max_wait
- is_false: summary.read.total_throttled
- is_false: summary.read.total_elapsed

---
"Timeout with large blobs":
- skip:
version: "- 7.11.99"
reason: "introduced in 7.12"

- do:
catch: request
snapshot.repository_analyze:
repository: test_repo_slow
blob_count: 1
concurrency: 1
max_blob_size: 2gb
max_total_data_size: 2gb
detailed: false
human: false
timeout: 1s

- match: { status: 500 }
- match: { error.type: repository_verification_exception }
- match: { error.reason: "/.*test_repo_slow..analysis.failed.*/" }
- match: { error.root_cause.0.type: receive_timeout_transport_exception }
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -188,6 +189,12 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
}
}

/**
* The atomic write API is based around a {@link BytesReference} which uses {@code int} for lengths and offsets and so on, so we can
* only use it to write a blob with size at most {@link Integer#MAX_VALUE}:
*/
static final long MAX_ATOMIC_WRITE_SIZE = Integer.MAX_VALUE;

/**
* Analysis on a single blob, performing the write(s) and orchestrating the read(s).
*/
Expand Down Expand Up @@ -265,10 +272,15 @@ static class BlobAnalysis {
}

void run() {
writeRandomBlob(request.readEarly || random.nextBoolean(), true, this::doReadBeforeWriteComplete, write1Step);
writeRandomBlob(
request.readEarly || (request.targetLength <= MAX_ATOMIC_WRITE_SIZE && random.nextBoolean()),
true,
this::doReadBeforeWriteComplete,
write1Step
);

if (request.writeAndOverwrite) {
assert request.targetLength <= Integer.MAX_VALUE : "oversized atomic write";
assert request.targetLength <= MAX_ATOMIC_WRITE_SIZE : "oversized atomic write";
write1Step.whenComplete(ignored -> writeRandomBlob(true, false, this::doReadAfterWrite, write2Step), ignored -> {});
} else {
write2Step.onResponse(null);
Expand All @@ -277,7 +289,7 @@ void run() {
}

private void writeRandomBlob(boolean atomic, boolean failIfExists, Runnable onLastRead, StepListener<WriteDetails> stepListener) {
assert atomic == false || request.targetLength <= Integer.MAX_VALUE : "oversized atomic write";
assert atomic == false || request.targetLength <= MAX_ATOMIC_WRITE_SIZE : "oversized atomic write";
final RandomBlobContent content = new RandomBlobContent(
request.getRepositoryName(),
random.nextLong(),
Expand All @@ -296,7 +308,7 @@ private void writeRandomBlob(boolean atomic, boolean failIfExists, Runnable onLa
// E.g. for S3 blob containers we would like to choose somewhat more randomly between single-part and multi-part uploads,
// rather than relying on the usual distinction based on the size of the blob.

if (atomic || (request.targetLength <= Integer.MAX_VALUE && random.nextBoolean())) {
if (atomic || (request.targetLength <= MAX_ATOMIC_WRITE_SIZE && random.nextBoolean())) {
final RandomBlobContentBytesReference bytesReference = new RandomBlobContentBytesReference(
content,
Math.toIntExact(request.getTargetLength())
Expand Down Expand Up @@ -613,7 +625,7 @@ public static class Request extends ActionRequest implements TaskAwareRequest {
boolean writeAndOverwrite
) {
assert 0 < targetLength;
assert targetLength <= Integer.MAX_VALUE || (readEarly == false && writeAndOverwrite == false) : "oversized atomic write";
assert targetLength <= MAX_ATOMIC_WRITE_SIZE || (readEarly == false && writeAndOverwrite == false) : "oversized atomic write";
this.repositoryName = repositoryName;
this.blobPath = blobPath;
this.blobName = blobName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.function.LongSupplier;
import java.util.stream.IntStream;

import static org.elasticsearch.repositories.blobstore.testkit.BlobAnalyzeAction.MAX_ATOMIC_WRITE_SIZE;
import static org.elasticsearch.repositories.blobstore.testkit.SnapshotRepositoryTestKit.humanReadableNanos;

/**
Expand Down Expand Up @@ -451,7 +452,7 @@ public void run() {

for (int i = 0; i < request.getBlobCount(); i++) {
final long targetLength = blobSizes.get(i);
final boolean smallBlob = targetLength <= Integer.MAX_VALUE; // avoid the atomic API for larger blobs
final boolean smallBlob = targetLength <= MAX_ATOMIC_WRITE_SIZE; // avoid the atomic API for larger blobs
final VerifyBlobTask verifyBlobTask = new VerifyBlobTask(
nodes.get(random.nextInt(nodes.size())),
new BlobAnalyzeAction.Request(
Expand Down

0 comments on commit 02cc01d

Please sign in to comment.