Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid atomic write of large blobs in repo analyzer #69960

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ setup:
readonly: true
location: "test_repo_loc"

- do:
snapshot.create_repository:
repository: test_repo_slow
body:
type: fs
settings:
max_snapshot_bytes_per_sec: "1b"
location: "test_repo_loc"

---
"Analysis fails on readonly repositories":
- skip:
Expand Down Expand Up @@ -145,3 +154,26 @@ setup:
- is_false: summary.read.max_wait
- is_false: summary.read.total_throttled
- is_false: summary.read.total_elapsed

---
"Timeout with large blobs":
- skip:
version: "- 7.11.99"
reason: "introduced in 7.12"

- do:
catch: request
snapshot.repository_analyze:
repository: test_repo_slow
blob_count: 1
concurrency: 1
max_blob_size: 2gb
max_total_data_size: 2gb
detailed: false
human: false
timeout: 1s

- match: { status: 500 }
- match: { error.type: repository_verification_exception }
- match: { error.reason: "/.*test_repo_slow..analysis.failed.*/" }
- match: { error.root_cause.0.type: receive_timeout_transport_exception }
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -188,6 +189,12 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
}
}

/**
* The atomic write API is based around a {@link BytesReference} which uses {@code int} for lengths and offsets and so on, so we can
* only use it to write a blob with size at most {@link Integer#MAX_VALUE}:
*/
static final long MAX_ATOMIC_WRITE_SIZE = Integer.MAX_VALUE;

/**
* Analysis on a single blob, performing the write(s) and orchestrating the read(s).
*/
Expand Down Expand Up @@ -265,10 +272,15 @@ static class BlobAnalysis {
}

void run() {
writeRandomBlob(request.readEarly || random.nextBoolean(), true, this::doReadBeforeWriteComplete, write1Step);
writeRandomBlob(
request.readEarly || (request.targetLength <= MAX_ATOMIC_WRITE_SIZE && random.nextBoolean()),
true,
this::doReadBeforeWriteComplete,
write1Step
);

if (request.writeAndOverwrite) {
assert request.targetLength <= Integer.MAX_VALUE : "oversized atomic write";
assert request.targetLength <= MAX_ATOMIC_WRITE_SIZE : "oversized atomic write";
write1Step.whenComplete(ignored -> writeRandomBlob(true, false, this::doReadAfterWrite, write2Step), ignored -> {});
} else {
write2Step.onResponse(null);
Expand All @@ -277,7 +289,7 @@ void run() {
}

private void writeRandomBlob(boolean atomic, boolean failIfExists, Runnable onLastRead, StepListener<WriteDetails> stepListener) {
assert atomic == false || request.targetLength <= Integer.MAX_VALUE : "oversized atomic write";
assert atomic == false || request.targetLength <= MAX_ATOMIC_WRITE_SIZE : "oversized atomic write";
final RandomBlobContent content = new RandomBlobContent(
request.getRepositoryName(),
random.nextLong(),
Expand All @@ -296,7 +308,7 @@ private void writeRandomBlob(boolean atomic, boolean failIfExists, Runnable onLa
// E.g. for S3 blob containers we would like to choose somewhat more randomly between single-part and multi-part uploads,
// rather than relying on the usual distinction based on the size of the blob.

if (atomic || (request.targetLength <= Integer.MAX_VALUE && random.nextBoolean())) {
if (atomic || (request.targetLength <= MAX_ATOMIC_WRITE_SIZE && random.nextBoolean())) {
final RandomBlobContentBytesReference bytesReference = new RandomBlobContentBytesReference(
content,
Math.toIntExact(request.getTargetLength())
Expand Down Expand Up @@ -613,7 +625,7 @@ public static class Request extends ActionRequest implements TaskAwareRequest {
boolean writeAndOverwrite
) {
assert 0 < targetLength;
assert targetLength <= Integer.MAX_VALUE || (readEarly == false && writeAndOverwrite == false) : "oversized atomic write";
assert targetLength <= MAX_ATOMIC_WRITE_SIZE || (readEarly == false && writeAndOverwrite == false) : "oversized atomic write";
this.repositoryName = repositoryName;
this.blobPath = blobPath;
this.blobName = blobName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.function.LongSupplier;
import java.util.stream.IntStream;

import static org.elasticsearch.repositories.blobstore.testkit.BlobAnalyzeAction.MAX_ATOMIC_WRITE_SIZE;
import static org.elasticsearch.repositories.blobstore.testkit.SnapshotRepositoryTestKit.humanReadableNanos;

/**
Expand Down Expand Up @@ -451,7 +452,7 @@ public void run() {

for (int i = 0; i < request.getBlobCount(); i++) {
final long targetLength = blobSizes.get(i);
final boolean smallBlob = targetLength <= Integer.MAX_VALUE; // avoid the atomic API for larger blobs
final boolean smallBlob = targetLength <= MAX_ATOMIC_WRITE_SIZE; // avoid the atomic API for larger blobs
final VerifyBlobTask verifyBlobTask = new VerifyBlobTask(
nodes.get(random.nextInt(nodes.size())),
new BlobAnalyzeAction.Request(
Expand Down