Skip to content

Commit

Permalink
Add integration test for Azure multi block uploads
Browse files Browse the repository at this point in the history
Relates elastic#68957
Backport of elastic#69267
  • Loading branch information
fcofdez committed Mar 1, 2021
1 parent fef107c commit 043c4f4
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -54,6 +55,7 @@ protected String repositoryType() {
protected Settings repositorySettings(String repoName) {
Settings.Builder settingsBuilder = Settings.builder()
.put(super.repositorySettings(repoName))
.put(AzureRepository.Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB))
.put(AzureRepository.Repository.CONTAINER_SETTING.getKey(), "container")
.put(AzureStorageSettings.ACCOUNT_SETTING.getKey(), "test");
if (randomBoolean()) {
Expand Down Expand Up @@ -119,11 +121,6 @@ RequestRetryOptions getRetryOptions(LocationMode locationMode, AzureStorageSetti
long getUploadBlockSize() {
return ByteSizeUnit.MB.toBytes(1);
}

@Override
long getSizeThresholdForMultiBlockUpload() {
return ByteSizeUnit.MB.toBytes(1);
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,31 @@

package org.elasticsearch.repositories.azure;

import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

import java.net.HttpURLConnection;
import java.util.Collection;

import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobStorageException;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;

import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobStorageException;
import java.io.ByteArrayInputStream;
import java.net.HttpURLConnection;
import java.util.Collection;

import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {

Expand Down Expand Up @@ -78,6 +82,7 @@ protected void createRepository(String repoName) {
.setSettings(Settings.builder()
.put("container", System.getProperty("test.azure.container"))
.put("base_path", System.getProperty("test.azure.base"))
.put("max_single_part_upload_size", new ByteSizeValue(1, ByteSizeUnit.MB))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
if (Strings.hasText(System.getProperty("test.azure.sas_token"))) {
Expand Down Expand Up @@ -111,4 +116,18 @@ private void ensureSasTokenPermissions() {
}));
future.actionGet();
}

public void testMultiBlockUpload() throws Exception {
final BlobStoreRepository repo = getRepository();
// The configured threshold for this test suite is 1mb
final int blobSize = ByteSizeUnit.MB.toIntBytes(2);
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
repo.threadPool().generic().execute(ActionRunnable.run(future, () -> {
final BlobContainer blobContainer = repo.blobStore().blobContainer(repo.basePath().add("large_write"));
blobContainer.writeBlob(UUIDs.base64UUID(),
new ByteArrayInputStream(randomByteArrayOfLength(blobSize)), blobSize, false);
blobContainer.delete();
}));
future.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class AzureBlobStore implements BlobStore {
private final String clientName;
private final String container;
private final LocationMode locationMode;
private final ByteSizeValue maxSinglePartUploadSize;

private final Stats stats = new Stats();
private final BiConsumer<String, URL> statsConsumer;
Expand All @@ -94,6 +95,7 @@ public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service)
this.service = service;
// locationMode is set per repository, not per client
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
this.maxSinglePartUploadSize = Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.get(metadata.settings());

List<RequestStatsCollector> requestStatsCollectors = Arrays.asList(
RequestStatsCollector.create(
Expand Down Expand Up @@ -553,7 +555,7 @@ static Tuple<Long, Long> numberOfMultiparts(final long totalSize, final long par
}

long getLargeBlobThresholdInBytes() {
return service.getSizeThresholdForMultiBlockUpload();
return maxSinglePartUploadSize.getBytes();
}

long getUploadBlockSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -61,6 +62,10 @@ public static final class Repository {
Setting.byteSizeSetting("chunk_size", MAX_CHUNK_SIZE, MIN_CHUNK_SIZE, MAX_CHUNK_SIZE, Property.NodeScope);
public static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, Property.NodeScope);
public static final Setting<Boolean> READONLY_SETTING = Setting.boolSetting(READONLY_SETTING_KEY, false, Property.NodeScope);
// see ModelHelper.BLOB_DEFAULT_MAX_SINGLE_UPLOAD_SIZE
private static final ByteSizeValue DEFAULT_MAX_SINGLE_UPLOAD_SIZE = new ByteSizeValue(256, ByteSizeUnit.MB);
public static final Setting<ByteSizeValue> MAX_SINGLE_PART_UPLOAD_SIZE_SETTING =
Setting.byteSizeSetting("max_single_part_upload_size", DEFAULT_MAX_SINGLE_UPLOAD_SIZE, Property.NodeScope);
}

private final BlobPath basePath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public class AzureStorageService {
*/
public static final ByteSizeValue MAX_CHUNK_SIZE = new ByteSizeValue(MAX_BLOB_SIZE , ByteSizeUnit.BYTES);

// see ModelHelper.BLOB_DEFAULT_MAX_SINGLE_UPLOAD_SIZE
private static final long DEFAULT_MAX_SINGLE_UPLOAD_SIZE = new ByteSizeValue(256, ByteSizeUnit.MB).getBytes();
private static final long DEFAULT_UPLOAD_BLOCK_SIZE = DEFAULT_BLOCK_SIZE.getBytes();

// 'package' for testing
Expand Down Expand Up @@ -125,11 +123,6 @@ long getUploadBlockSize() {
return DEFAULT_UPLOAD_BLOCK_SIZE;
}

// non-static, package private for testing
long getSizeThresholdForMultiBlockUpload() {
return DEFAULT_MAX_SINGLE_UPLOAD_SIZE;
}

int getMaxReadRetries(String clientName) {
AzureStorageSettings azureStorageSettings = getClientSettings(clientName);
return azureStorageSettings.getMaxRetries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.mocksocket.MockHttpServer;
Expand Down Expand Up @@ -64,6 +65,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsearch.repositories.azure.AzureRepository.Repository.CONTAINER_SETTING;
import static org.elasticsearch.repositories.azure.AzureRepository.Repository.LOCATION_MODE_SETTING;
import static org.elasticsearch.repositories.azure.AzureRepository.Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING;
import static org.elasticsearch.repositories.azure.AzureStorageSettings.ACCOUNT_SETTING;
import static org.elasticsearch.repositories.azure.AzureStorageSettings.ENDPOINT_SUFFIX_SETTING;
import static org.elasticsearch.repositories.azure.AzureStorageSettings.KEY_SETTING;
Expand Down Expand Up @@ -156,11 +158,6 @@ long getUploadBlockSize() {
return ByteSizeUnit.MB.toBytes(1);
}

@Override
long getSizeThresholdForMultiBlockUpload() {
return ByteSizeUnit.MB.toBytes(1);
}

@Override
int getMaxReadRetries(String clientName) {
return maxRetries;
Expand All @@ -172,6 +169,7 @@ int getMaxReadRetries(String clientName) {
.put(CONTAINER_SETTING.getKey(), "container")
.put(ACCOUNT_SETTING.getKey(), clientName)
.put(LOCATION_MODE_SETTING.getKey(), locationMode)
.put(MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB))
.build());

return new AzureBlobContainer(BlobPath.cleanPath(), new AzureBlobStore(repositoryMetadata, service));
Expand Down

0 comments on commit 043c4f4

Please sign in to comment.