diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index 1ef85b068348e..3e5638893c8ac 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -30,6 +30,7 @@ import com.microsoft.azure.storage.blob.BlobInputStream; import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; +import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; @@ -328,7 +329,7 @@ public void writeBlob(String account, String container, String blobName, InputSt final AccessCondition accessCondition = failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition(); SocketAccess.doPrivilegedVoidException(() -> - blob.upload(inputStream, blobSize, accessCondition, null, client.v2().get())); + blob.upload(inputStream, blobSize, accessCondition, getBlobRequestOptionsForWriteBlob(), client.v2().get())); } catch (final StorageException se) { if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT && StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) { @@ -339,6 +340,11 @@ public void writeBlob(String account, String container, String blobName, InputSt logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize)); } + // package private for testing + BlobRequestOptions getBlobRequestOptionsForWriteBlob() { + return null; + } + static InputStream giveSocketPermissionsToStream(final InputStream stream) { return new InputStream() { @Override diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index 12d212cc1bbe7..6960eced0a701 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -21,27 +21,46 @@ import com.microsoft.azure.storage.Constants; import com.microsoft.azure.storage.RetryExponentialRetry; import com.microsoft.azure.storage.RetryPolicyFactory; +import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; +import org.elasticsearch.test.BackgroundIndexer; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.equalTo; @SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint") public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { @@ -90,7 +109,43 @@ protected Settings nodeSettings(int nodeOrdinal) { } /** - * AzureRepositoryPlugin that allows to set very low values for the Azure's client retry policy + * Test the snapshot and restore of an index which has large segments files. + */ + public void testSnapshotWithLargeSegmentFiles() throws Exception { + final String repository = createRepository(randomName()); + final String index = "index-no-merges"; + createIndex(index, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.MINUS_ONE) + .build()); + + // the number of documents here dictates the size of the single segment + // we want a large segment (1Mb+) so that Azure SDK client executes Put Block API calls + // the size of each uploaded block is defined by Constants.DEFAULT_STREAM_WRITE_IN_BYTES (~4Mb) + final long nbDocs = randomLongBetween(10_000L, 20_000L); + try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) { + awaitBusy(() -> indexer.totalIndexedDocs() >= nbDocs); + } + + flushAndRefresh(index); + ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get(); + assertThat(forceMerge.getSuccessfulShards(), equalTo(1)); + assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs); + + assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot") + .setWaitForCompletion(true).setIndices(index)); + + assertAcked(client().admin().indices().prepareDelete(index)); + + assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true)); + ensureGreen(index); + assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs); + } + + /** + * AzureRepositoryPlugin that allows to set low values for the Azure's client retry policy + * and for BlobRequestOptions#getSingleBlobPutThresholdInBytes(). */ public static class TestAzureRepositoryPlugin extends AzureRepositoryPlugin { @@ -105,6 +160,13 @@ AzureStorageService createAzureStoreService(final Settings settings) { RetryPolicyFactory createRetryPolicy(final AzureStorageSettings azureStorageSettings) { return new RetryExponentialRetry(1, 100, 500, azureStorageSettings.getMaxRetries()); } + + @Override + BlobRequestOptions getBlobRequestOptionsForWriteBlob() { + BlobRequestOptions options = new BlobRequestOptions(); + options.setSingleBlobPutThresholdInBytes(Math.toIntExact(ByteSizeUnit.MB.toBytes(1))); + return options; + } }; } } @@ -121,12 +183,36 @@ private static class InternalHttpHandler implements HttpHandler { public void handle(final HttpExchange exchange) throws IOException { final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); try { - if (Regex.simpleMatch("PUT /container/*", request)) { - blobs.put(exchange.getRequestURI().toString(), Streams.readFully(exchange.getRequestBody())); + if (Regex.simpleMatch("PUT /container/*blockid=*", request)) { + final Map params = new HashMap<>(); + RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); + + final String blockId = params.get("blockid"); + blobs.put(blockId, Streams.readFully(exchange.getRequestBody())); + exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); + + } else if (Regex.simpleMatch("PUT /container/*comp=blocklist*", request)) { + final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8)); + final List blockIds = Arrays.stream(blockList.split("")) + .filter(line -> line.contains("")) + .map(line -> line.substring(0, line.indexOf(""))) + .collect(Collectors.toList()); + + final ByteArrayOutputStream blob = new ByteArrayOutputStream(); + for (String blockId : blockIds) { + BytesReference block = blobs.remove(blockId); + assert block != null; + block.writeTo(blob); + } + blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray())); + exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); + + } else if (Regex.simpleMatch("PUT /container/*", request)) { + blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())); exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); } else if (Regex.simpleMatch("HEAD /container/*", request)) { - BytesReference blob = blobs.get(exchange.getRequestURI().toString()); + final BytesReference blob = blobs.get(exchange.getRequestURI().getPath()); if (blob == null) { exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); return; @@ -136,20 +222,28 @@ public void handle(final HttpExchange exchange) throws IOException { exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); } else if (Regex.simpleMatch("GET /container/*", request)) { - final BytesReference blob = blobs.get(exchange.getRequestURI().toString()); + final BytesReference blob = blobs.get(exchange.getRequestURI().getPath()); if (blob == null) { exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); return; } + + final String range = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.STORAGE_RANGE_HEADER); + final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(range); + assertTrue(matcher.matches()); + + final int start = Integer.parseInt(matcher.group(1)); + final int length = Integer.parseInt(matcher.group(2)) - start + 1; + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); - exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blob.length())); + exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length)); exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blob.length()); - blob.writeTo(exchange.getResponseBody()); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length); + exchange.getResponseBody().write(blob.toBytesRef().bytes, start, length); } else if (Regex.simpleMatch("DELETE /container/*", request)) { Streams.readFully(exchange.getRequestBody()); - blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().toString())); + blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath())); exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1); } else if (Regex.simpleMatch("GET /container?restype=container&comp=list*", request)) { @@ -200,8 +294,11 @@ private static class AzureErroneousHttpHandler extends ErroneousHttpHandler { @Override protected String requestUniqueId(final HttpExchange exchange) { - // Azure SDK client provides a unique ID per request - return exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER); + final String requestId = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER); + final String range = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.STORAGE_RANGE_HEADER); + return exchange.getRequestMethod() + + " " + requestId + + (range != null ? " " + range : ""); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 09814c33105d4..41a92eed28967 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -307,7 +307,7 @@ private static void assertSuccessfulSnapshot(CreateSnapshotResponse response) { assertThat(response.getSnapshotInfo().successfulShards(), equalTo(response.getSnapshotInfo().totalShards())); } - private static void assertSuccessfulRestore(RestoreSnapshotRequestBuilder requestBuilder) { + protected static void assertSuccessfulRestore(RestoreSnapshotRequestBuilder requestBuilder) { RestoreSnapshotResponse response = requestBuilder.get(); assertSuccessfulRestore(response); }