Skip to content

Commit 799f7de

Browse files
committed
Add block support to AzureBlobStoreRepositoryTests (#46664)
This commit adds support for Put Block API to the internal HTTP server used in Azure repository integration tests. This allows to test the behavior of the Azure SDK client when the Azure Storage service returns errors when uploading Blob in multiple blocks or when downloading a blob using ranged downloads.
1 parent fd42358 commit 799f7de

File tree

2 files changed

+115
-12
lines changed

2 files changed

+115
-12
lines changed

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.microsoft.azure.storage.blob.BlobInputStream;
3131
import com.microsoft.azure.storage.blob.BlobListingDetails;
3232
import com.microsoft.azure.storage.blob.BlobProperties;
33+
import com.microsoft.azure.storage.blob.BlobRequestOptions;
3334
import com.microsoft.azure.storage.blob.CloudBlob;
3435
import com.microsoft.azure.storage.blob.CloudBlobClient;
3536
import com.microsoft.azure.storage.blob.CloudBlobContainer;
@@ -329,7 +330,7 @@ public void writeBlob(String account, String container, String blobName, InputSt
329330
final AccessCondition accessCondition =
330331
failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition();
331332
SocketAccess.doPrivilegedVoidException(() ->
332-
blob.upload(inputStream, blobSize, accessCondition, null, client.v2().get()));
333+
blob.upload(inputStream, blobSize, accessCondition, getBlobRequestOptionsForWriteBlob(), client.v2().get()));
333334
} catch (final StorageException se) {
334335
if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
335336
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
@@ -340,6 +341,11 @@ public void writeBlob(String account, String container, String blobName, InputSt
340341
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize));
341342
}
342343

344+
// package private for testing
345+
BlobRequestOptions getBlobRequestOptionsForWriteBlob() {
346+
return null;
347+
}
348+
343349
static InputStream giveSocketPermissionsToStream(final InputStream stream) {
344350
return new InputStream() {
345351
@Override

plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java

Lines changed: 108 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,46 @@
2121
import com.microsoft.azure.storage.Constants;
2222
import com.microsoft.azure.storage.RetryExponentialRetry;
2323
import com.microsoft.azure.storage.RetryPolicyFactory;
24+
import com.microsoft.azure.storage.blob.BlobRequestOptions;
2425
import com.sun.net.httpserver.HttpExchange;
2526
import com.sun.net.httpserver.HttpHandler;
27+
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
28+
import org.elasticsearch.cluster.metadata.IndexMetaData;
2629
import org.elasticsearch.common.SuppressForbidden;
30+
import org.elasticsearch.common.bytes.BytesArray;
2731
import org.elasticsearch.common.bytes.BytesReference;
2832
import org.elasticsearch.common.io.Streams;
2933
import org.elasticsearch.common.regex.Regex;
3034
import org.elasticsearch.common.settings.MockSecureSettings;
3135
import org.elasticsearch.common.settings.Settings;
36+
import org.elasticsearch.common.unit.ByteSizeUnit;
37+
import org.elasticsearch.common.unit.TimeValue;
38+
import org.elasticsearch.index.IndexSettings;
3239
import org.elasticsearch.plugins.Plugin;
3340
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
3441
import org.elasticsearch.rest.RestStatus;
3542
import org.elasticsearch.rest.RestUtils;
43+
import org.elasticsearch.test.BackgroundIndexer;
3644

45+
import java.io.ByteArrayOutputStream;
3746
import java.io.IOException;
47+
import java.io.InputStreamReader;
3848
import java.nio.charset.StandardCharsets;
49+
import java.util.Arrays;
3950
import java.util.Base64;
4051
import java.util.Collection;
4152
import java.util.Collections;
4253
import java.util.HashMap;
54+
import java.util.List;
4355
import java.util.Map;
4456
import java.util.concurrent.ConcurrentHashMap;
57+
import java.util.regex.Matcher;
58+
import java.util.regex.Pattern;
59+
import java.util.stream.Collectors;
60+
61+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
62+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
63+
import static org.hamcrest.Matchers.equalTo;
4564

4665
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
4766
public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
@@ -90,7 +109,43 @@ protected Settings nodeSettings(int nodeOrdinal) {
90109
}
91110

92111
/**
93-
* AzureRepositoryPlugin that allows to set very low values for the Azure's client retry policy
112+
* Test the snapshot and restore of an index which has large segments files.
113+
*/
114+
public void testSnapshotWithLargeSegmentFiles() throws Exception {
115+
final String repository = createRepository(randomName());
116+
final String index = "index-no-merges";
117+
createIndex(index, Settings.builder()
118+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
119+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
120+
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.MINUS_ONE)
121+
.build());
122+
123+
// the number of documents here dictates the size of the single segment
124+
// we want a large segment (1Mb+) so that Azure SDK client executes Put Block API calls
125+
// the size of each uploaded block is defined by Constants.DEFAULT_STREAM_WRITE_IN_BYTES (~4Mb)
126+
final long nbDocs = randomLongBetween(10_000L, 20_000L);
127+
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
128+
awaitBusy(() -> indexer.totalIndexedDocs() >= nbDocs);
129+
}
130+
131+
flushAndRefresh(index);
132+
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
133+
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
134+
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
135+
136+
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
137+
.setWaitForCompletion(true).setIndices(index));
138+
139+
assertAcked(client().admin().indices().prepareDelete(index));
140+
141+
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
142+
ensureGreen(index);
143+
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
144+
}
145+
146+
/**
147+
* AzureRepositoryPlugin that allows to set low values for the Azure's client retry policy
148+
* and for BlobRequestOptions#getSingleBlobPutThresholdInBytes().
94149
*/
95150
public static class TestAzureRepositoryPlugin extends AzureRepositoryPlugin {
96151

@@ -105,6 +160,13 @@ AzureStorageService createAzureStoreService(final Settings settings) {
105160
RetryPolicyFactory createRetryPolicy(final AzureStorageSettings azureStorageSettings) {
106161
return new RetryExponentialRetry(1, 100, 500, azureStorageSettings.getMaxRetries());
107162
}
163+
164+
@Override
165+
BlobRequestOptions getBlobRequestOptionsForWriteBlob() {
166+
BlobRequestOptions options = new BlobRequestOptions();
167+
options.setSingleBlobPutThresholdInBytes(Math.toIntExact(ByteSizeUnit.MB.toBytes(1)));
168+
return options;
169+
}
108170
};
109171
}
110172
}
@@ -121,12 +183,36 @@ private static class InternalHttpHandler implements HttpHandler {
121183
public void handle(final HttpExchange exchange) throws IOException {
122184
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
123185
try {
124-
if (Regex.simpleMatch("PUT /container/*", request)) {
125-
blobs.put(exchange.getRequestURI().toString(), Streams.readFully(exchange.getRequestBody()));
186+
if (Regex.simpleMatch("PUT /container/*blockid=*", request)) {
187+
final Map<String, String> params = new HashMap<>();
188+
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
189+
190+
final String blockId = params.get("blockid");
191+
blobs.put(blockId, Streams.readFully(exchange.getRequestBody()));
192+
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
193+
194+
} else if (Regex.simpleMatch("PUT /container/*comp=blocklist*", request)) {
195+
final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8));
196+
final List<String> blockIds = Arrays.stream(blockList.split("<Latest>"))
197+
.filter(line -> line.contains("</Latest>"))
198+
.map(line -> line.substring(0, line.indexOf("</Latest>")))
199+
.collect(Collectors.toList());
200+
201+
final ByteArrayOutputStream blob = new ByteArrayOutputStream();
202+
for (String blockId : blockIds) {
203+
BytesReference block = blobs.remove(blockId);
204+
assert block != null;
205+
block.writeTo(blob);
206+
}
207+
blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray()));
208+
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
209+
210+
} else if (Regex.simpleMatch("PUT /container/*", request)) {
211+
blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody()));
126212
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
127213

128214
} else if (Regex.simpleMatch("HEAD /container/*", request)) {
129-
BytesReference blob = blobs.get(exchange.getRequestURI().toString());
215+
final BytesReference blob = blobs.get(exchange.getRequestURI().getPath());
130216
if (blob == null) {
131217
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
132218
return;
@@ -136,20 +222,28 @@ public void handle(final HttpExchange exchange) throws IOException {
136222
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
137223

138224
} else if (Regex.simpleMatch("GET /container/*", request)) {
139-
final BytesReference blob = blobs.get(exchange.getRequestURI().toString());
225+
final BytesReference blob = blobs.get(exchange.getRequestURI().getPath());
140226
if (blob == null) {
141227
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
142228
return;
143229
}
230+
231+
final String range = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.STORAGE_RANGE_HEADER);
232+
final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(range);
233+
assertTrue(matcher.matches());
234+
235+
final int start = Integer.parseInt(matcher.group(1));
236+
final int length = Integer.parseInt(matcher.group(2)) - start + 1;
237+
144238
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
145-
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blob.length()));
239+
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length));
146240
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
147-
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blob.length());
148-
blob.writeTo(exchange.getResponseBody());
241+
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length);
242+
exchange.getResponseBody().write(blob.toBytesRef().bytes, start, length);
149243

150244
} else if (Regex.simpleMatch("DELETE /container/*", request)) {
151245
Streams.readFully(exchange.getRequestBody());
152-
blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().toString()));
246+
blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath()));
153247
exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1);
154248

155249
} else if (Regex.simpleMatch("GET /container?restype=container&comp=list*", request)) {
@@ -200,8 +294,11 @@ private static class AzureErroneousHttpHandler extends ErroneousHttpHandler {
200294

201295
@Override
202296
protected String requestUniqueId(final HttpExchange exchange) {
203-
// Azure SDK client provides a unique ID per request
204-
return exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER);
297+
final String requestId = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER);
298+
final String range = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.STORAGE_RANGE_HEADER);
299+
return exchange.getRequestMethod()
300+
+ " " + requestId
301+
+ (range != null ? " " + range : "");
205302
}
206303
}
207304
}

0 commit comments

Comments
 (0)