Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.microsoft.azure.storage.blob.BlobInputStream;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
Expand Down Expand Up @@ -328,7 +329,7 @@ public void writeBlob(String account, String container, String blobName, InputSt
final AccessCondition accessCondition =
failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition();
SocketAccess.doPrivilegedVoidException(() ->
blob.upload(inputStream, blobSize, accessCondition, null, client.v2().get()));
blob.upload(inputStream, blobSize, accessCondition, getBlobRequestOptionsForWriteBlob(), client.v2().get()));
} catch (final StorageException se) {
if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
Expand All @@ -339,6 +340,11 @@ public void writeBlob(String account, String container, String blobName, InputSt
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize));
}

// package private for testing
BlobRequestOptions getBlobRequestOptionsForWriteBlob() {
return null;
}

static InputStream giveSocketPermissionsToStream(final InputStream stream) {
return new InputStream() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,46 @@
import com.microsoft.azure.storage.Constants;
import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.RetryPolicyFactory;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.test.BackgroundIndexer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
Expand Down Expand Up @@ -90,7 +109,43 @@ protected Settings nodeSettings(int nodeOrdinal) {
}

/**
* AzureRepositoryPlugin that allows to set very low values for the Azure's client retry policy
* Test the snapshot and restore of an index which has large segments files.
*/
public void testSnapshotWithLargeSegmentFiles() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we gonna build S3 multi-part uploads as well and then move this to the abstract parent? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the idea :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #46802

final String repository = createRepository(randomName());
final String index = "index-no-merges";
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.MINUS_ONE)
.build());

// the number of documents here dictates the size of the single segment
// we want a large segment (1Mb+) so that Azure SDK client executes Put Block API calls
// the size of each uploaded block is defined by Constants.DEFAULT_STREAM_WRITE_IN_BYTES (~4Mb)
final long nbDocs = randomLongBetween(10_000L, 20_000L);
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
awaitBusy(() -> indexer.totalIndexedDocs() >= nbDocs);
}

flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);

assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
.setWaitForCompletion(true).setIndices(index));

assertAcked(client().admin().indices().prepareDelete(index));

assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
}

/**
* AzureRepositoryPlugin that allows to set low values for the Azure's client retry policy
* and for BlobRequestOptions#getSingleBlobPutThresholdInBytes().
*/
public static class TestAzureRepositoryPlugin extends AzureRepositoryPlugin {

Expand All @@ -105,6 +160,13 @@ AzureStorageService createAzureStoreService(final Settings settings) {
RetryPolicyFactory createRetryPolicy(final AzureStorageSettings azureStorageSettings) {
return new RetryExponentialRetry(1, 100, 500, azureStorageSettings.getMaxRetries());
}

@Override
BlobRequestOptions getBlobRequestOptionsForWriteBlob() {
BlobRequestOptions options = new BlobRequestOptions();
options.setSingleBlobPutThresholdInBytes(Math.toIntExact(ByteSizeUnit.MB.toBytes(1)));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows to lower the threshold between single Put Blob request and Put Blocks + Put Blocks List requests.

return options;
}
};
}
}
Expand All @@ -121,12 +183,36 @@ private static class InternalHttpHandler implements HttpHandler {
public void handle(final HttpExchange exchange) throws IOException {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
try {
if (Regex.simpleMatch("PUT /container/*", request)) {
blobs.put(exchange.getRequestURI().toString(), Streams.readFully(exchange.getRequestBody()));
if (Regex.simpleMatch("PUT /container/*blockid=*", request)) {
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);

final String blockId = params.get("blockid");
blobs.put(blockId, Streams.readFully(exchange.getRequestBody()));
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);

} else if (Regex.simpleMatch("PUT /container/*comp=blocklist*", request)) {
final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8));
final List<String> blockIds = Arrays.stream(blockList.split("<Latest>"))
.filter(line -> line.contains("</Latest>"))
.map(line -> line.substring(0, line.indexOf("</Latest>")))
.collect(Collectors.toList());

final ByteArrayOutputStream blob = new ByteArrayOutputStream();
for (String blockId : blockIds) {
BytesReference block = blobs.remove(blockId);
assert block != null;
block.writeTo(blob);
}
blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray()));
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);

} else if (Regex.simpleMatch("PUT /container/*", request)) {
blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody()));
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);

} else if (Regex.simpleMatch("HEAD /container/*", request)) {
BytesReference blob = blobs.get(exchange.getRequestURI().toString());
final BytesReference blob = blobs.get(exchange.getRequestURI().getPath());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was wrong (ie, query parameters were part of the blob name)

if (blob == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
return;
Expand All @@ -136,20 +222,28 @@ public void handle(final HttpExchange exchange) throws IOException {
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);

} else if (Regex.simpleMatch("GET /container/*", request)) {
final BytesReference blob = blobs.get(exchange.getRequestURI().toString());
final BytesReference blob = blobs.get(exchange.getRequestURI().getPath());
if (blob == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
return;
}

final String range = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.STORAGE_RANGE_HEADER);
final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(range);
assertTrue(matcher.matches());

final int start = Integer.parseInt(matcher.group(1));
final int length = Integer.parseInt(matcher.group(2)) - start + 1;

exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blob.length()));
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length));
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blob.length());
blob.writeTo(exchange.getResponseBody());
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length);
exchange.getResponseBody().write(blob.toBytesRef().bytes, start, length);

} else if (Regex.simpleMatch("DELETE /container/*", request)) {
Streams.readFully(exchange.getRequestBody());
blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().toString()));
blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath()));
exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1);

} else if (Regex.simpleMatch("GET /container?restype=container&comp=list*", request)) {
Expand Down Expand Up @@ -200,8 +294,11 @@ private static class AzureErroneousHttpHandler extends ErroneousHttpHandler {

@Override
protected String requestUniqueId(final HttpExchange exchange) {
// Azure SDK client provides a unique ID per request
return exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER);
final String requestId = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not notice it but HEAD and GET blob share the same request id. When reading a blob from Azure using input stream, the SDK first execute a HEAD request to know the size of the object to read and then retrieve the content using 1 or more requests, all with the same request id.

final String range = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.STORAGE_RANGE_HEADER);
return exchange.getRequestMethod()
+ " " + requestId
+ (range != null ? " " + range : "");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ private static void assertSuccessfulSnapshot(CreateSnapshotResponse response) {
assertThat(response.getSnapshotInfo().successfulShards(), equalTo(response.getSnapshotInfo().totalShards()));
}

private static void assertSuccessfulRestore(RestoreSnapshotRequestBuilder requestBuilder) {
protected static void assertSuccessfulRestore(RestoreSnapshotRequestBuilder requestBuilder) {
RestoreSnapshotResponse response = requestBuilder.get();
assertSuccessfulRestore(response);
}
Expand Down