From d188b6cdcb68ca3c635bba1ba081202394d9574c Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Tue, 22 Sep 2020 16:30:24 -0700 Subject: [PATCH 1/4] Added support to specify chunk size in BlobInputStream --- .../blob/options/BlobInputStreamOptions.java | 71 +++++++++++++++++++ .../blob/specialized/BlobClientBase.java | 22 +++++- .../blob/specialized/BlobInputStream.java | 46 ++---------- .../BlockBlobInputOutputStreamTest.groovy | 49 ++++++++++++- 4 files changed, 144 insertions(+), 44 deletions(-) create mode 100644 sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobInputStreamOptions.java diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobInputStreamOptions.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobInputStreamOptions.java new file mode 100644 index 0000000000000..1230aa04687ae --- /dev/null +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobInputStreamOptions.java @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.options; + +import com.azure.core.annotation.Fluent; +import com.azure.storage.blob.models.BlobRange; +import com.azure.storage.blob.models.BlobRequestConditions; + +/** + * Extended options that may be passed when opening a blob input stream. + */ +@Fluent +public class BlobInputStreamOptions { + + private BlobRange range; + private BlobRequestConditions requestConditions; + private Integer chunkSize; + + /** + * @return {@link BlobRange} + */ + public BlobRange getRange() { + return range; + } + + /** + * @param range {@link BlobRange} + * @return The updated options. + */ + public BlobInputStreamOptions setRange(BlobRange range) { + this.range = range; + return this; + } + + /** + * @return {@link BlobRequestConditions} + */ + public BlobRequestConditions getRequestConditions() { + return requestConditions; + } + + /** + * @param requestConditions {@link BlobRequestConditions} + * @return The updated options. + */ + public BlobInputStreamOptions setRequestConditions(BlobRequestConditions requestConditions) { + this.requestConditions = requestConditions; + return this; + } + + /** + * @return The size of each data chunk returned from the service. If block size is large, input stream will make + * fewer network calls, but each individual call will send more data and will therefore take longer. + * The default value is 4 MB. + */ + public Integer getChunkSize() { + return chunkSize; + } + + /** + * @param chunkSize The size of each data chunk returned from the service. If block size is large, input stream + * will make fewer network calls, but each individual call will send more data and will therefore take longer. + * The default value is 4 MB. + * @return The updated options. + */ + public BlobInputStreamOptions setChunkSize(Integer chunkSize) { + this.chunkSize = chunkSize; + return this; + } +} diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java index 7be55e86ef6c8..41602c9ec97d5 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java @@ -25,6 +25,7 @@ import com.azure.storage.blob.models.BlobQueryAsyncResponse; import com.azure.storage.blob.options.BlobDownloadToFileOptions; import com.azure.storage.blob.options.BlobGetTagsOptions; +import com.azure.storage.blob.options.BlobInputStreamOptions; import com.azure.storage.blob.options.BlobQueryOptions; import com.azure.storage.blob.models.BlobQueryResponse; import com.azure.storage.blob.models.BlobRange; @@ -41,6 +42,7 @@ import com.azure.storage.blob.options.BlobSetTagsOptions; import com.azure.storage.blob.sas.BlobServiceSasSignatureValues; import com.azure.storage.common.StorageSharedKeyCredential; +import com.azure.storage.common.implementation.Constants; import com.azure.storage.common.implementation.FluxInputStream; import com.azure.storage.common.implementation.StorageImplUtils; import reactor.core.Exceptions; @@ -219,7 +221,7 @@ public boolean isSnapshot() { * @throws BlobStorageException If a storage service error occurred. */ public final BlobInputStream openInputStream() { - return openInputStream(new BlobRange(0), null); + return openInputStream(null, null); } /** @@ -233,7 +235,23 @@ public final BlobInputStream openInputStream() { * @throws BlobStorageException If a storage service error occurred. */ public final BlobInputStream openInputStream(BlobRange range, BlobRequestConditions requestConditions) { - return new BlobInputStream(client, range.getOffset(), range.getCount(), requestConditions); + return openInputStream(new BlobInputStreamOptions().setRange(range).setRequestConditions(requestConditions)); + } + + /** + * Opens a blob input stream to download the specified range of the blob. + * + * @param options {@link BlobInputStreamOptions} + * @return An InputStream object that represents the stream to use for reading from the blob. + * @throws BlobStorageException If a storage service error occurred. + */ + public final BlobInputStream openInputStream(BlobInputStreamOptions options) { + options = options == null ? new BlobInputStreamOptions() : options; + BlobRange range = options.getRange() == null ? new BlobRange(0) : options.getRange(); + int chunkSize = options.getChunkSize() == null ? 4 * Constants.MB : options.getChunkSize(); + + return new BlobInputStream(client, range.getOffset(), range.getCount(), chunkSize, + options.getRequestConditions(), getProperties()); } /** diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobInputStream.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobInputStream.java index 8ec7602ca7223..ef5bfb3538ca3 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobInputStream.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobInputStream.java @@ -34,19 +34,6 @@ public final class BlobInputStream extends StorageInputStream { */ private BlobProperties properties; - /** - * Initializes a new instance of the BlobInputStream class. - * - * @param blobClient A {@link BlobAsyncClient} object which represents the blob that this stream is associated with. - * @param accessCondition An {@link BlobRequestConditions} object which represents the access conditions for the - * blob. - * @throws BlobStorageException An exception representing any error which occurred during the operation. - */ - BlobInputStream(final BlobAsyncClient blobClient, final BlobRequestConditions accessCondition) - throws BlobStorageException { - this(blobClient, 0, null, accessCondition); - } - /** * Initializes a new instance of the BlobInputStream class. Note that if {@code blobRangeOffset} is not {@code 0} or * {@code blobRangeLength} is not {@code null}, there will be no content MD5 verification. @@ -55,19 +42,19 @@ public final class BlobInputStream extends StorageInputStream { * with. * @param blobRangeOffset The offset of blob data to begin stream. * @param blobRangeLength How much data the stream should return after blobRangeOffset. + * @param chunkSize The size of the chunk to download. * @param accessCondition An {@link BlobRequestConditions} object which represents the access conditions for the * blob. * @throws BlobStorageException An exception representing any error which occurred during the operation. */ - BlobInputStream(final BlobAsyncClientBase blobClient, long blobRangeOffset, Long blobRangeLength, - final BlobRequestConditions accessCondition) + BlobInputStream(final BlobAsyncClientBase blobClient, long blobRangeOffset, Long blobRangeLength, int chunkSize, + final BlobRequestConditions accessCondition, final BlobProperties blobProperties) throws BlobStorageException { - super(blobRangeOffset, blobRangeLength, 4 * Constants.MB, - blobClient.getProperties().block().getBlobSize()); + super(blobRangeOffset, blobRangeLength, chunkSize, blobProperties.getBlobSize()); this.blobClient = blobClient; this.accessCondition = accessCondition; - this.properties = null; + this.properties = blobProperties; } /** @@ -83,11 +70,6 @@ protected synchronized ByteBuffer dispatchRead(final int readLength, final long ByteBuffer currentBuffer = this.blobClient.downloadWithResponse(new BlobRange(offset, (long) readLength), null, this.accessCondition, false) .flatMap(response -> { - // Only populate properties if it has not been populated yet, this is ok since we etag lock on the - // blob while downloading, so it is guaranteed to be the same. - if (this.properties == null) { - this.properties = buildBlobProperties(response.getDeserializedHeaders()); - } return FluxUtil.collectBytesInByteBufferStream(response.getValue()).map(ByteBuffer::wrap); }) .block(); @@ -102,21 +84,6 @@ protected synchronized ByteBuffer dispatchRead(final int readLength, final long } } - private static BlobProperties buildBlobProperties(BlobDownloadHeaders hd) { - if (hd == null) { - return null; - } - return new BlobProperties(null, hd.getLastModified(), hd.getETag(), - hd.getContentLength() == null ? 0 : hd.getContentLength(), hd.getContentType(), null, - hd.getContentEncoding(), hd.getContentDisposition(), hd.getContentLanguage(), hd.getCacheControl(), - hd.getBlobSequenceNumber(), hd.getBlobType(), hd.getLeaseStatus(), hd.getLeaseState(), - hd.getLeaseDuration(), hd.getCopyId(), hd.getCopyStatus(), hd.getCopySource(), hd.getCopyProgress(), - hd.getCopyCompletionTime(), hd.getCopyStatusDescription(), hd.isServerEncrypted(), - null, null, null, null, null, hd.getEncryptionKeySha256(), hd.getEncryptionScope(), null, hd.getMetadata(), - hd.getBlobCommittedBlockCount(), hd.getTagCount(), hd.getVersionId(), null, - hd.getObjectReplicationSourcePolicies(), hd.getObjectReplicationDestinationPolicyId()); - } - /** * Gets the blob properties. *

@@ -126,9 +93,6 @@ private static BlobProperties buildBlobProperties(BlobDownloadHeaders hd) { * @return {@link BlobProperties} */ public BlobProperties getProperties() { - if (this.properties == null) { - this.properties = blobClient.getPropertiesWithResponse(accessCondition).block().getValue(); - } return this.properties; } diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlockBlobInputOutputStreamTest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlockBlobInputOutputStreamTest.groovy index 00591e9424d18..e989a76b31bf4 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlockBlobInputOutputStreamTest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlockBlobInputOutputStreamTest.groovy @@ -1,6 +1,7 @@ package com.azure.storage.blob import com.azure.storage.blob.models.BlobType +import com.azure.storage.blob.options.BlobInputStreamOptions import com.azure.storage.blob.specialized.BlobOutputStream import com.azure.storage.blob.specialized.BlockBlobClient import com.azure.storage.common.implementation.Constants @@ -65,11 +66,57 @@ class BlockBlobInputOutputStreamTest extends APISpec { } def propertiesAfter = inputStream.getProperties() propertiesAfter.getBlobType() == BlobType.BLOCK_BLOB - propertiesAfter.getBlobSize() == 4 * Constants.MB + propertiesAfter.getBlobSize() == 5 * Constants.MB byte[] randomBytes2 = outputStream.toByteArray() assert randomBytes2 == Arrays.copyOfRange(randomBytes, 1 * Constants.MB, 6 * Constants.MB) } + // Only run this test in live mode as BlobOutputStream dynamically assigns blocks + @Requires({ liveMode() }) + @Unroll + def "Upload download chunk size"() { + when: + int length = 6 * Constants.MB + byte[] randomBytes = getRandomByteArray(length) + + BlobOutputStream outStream = bc.getBlobOutputStream() + outStream.write(randomBytes, 0, 6 * Constants.MB) + outStream.close() + + then: + def inputStream = bc.openInputStream(new BlobInputStreamOptions().setChunkSize(chunkSize)) + int b + def outputStream = new ByteArrayOutputStream() + try { + for (int i = 0; i < numChunks; i++) { + b = inputStream.read() + assert b != -1 + outputStream.write(b) + assert inputStream.available() == sizes[i] - 1 // Make sure the internal buffer is the expected chunk size. + // Read the rest of the chunk + for (int j = 0; j < sizes[i] - 1; j++) { + b = inputStream.read() + assert b != -1 + outputStream.write(b) + } + } + } catch (IOException ex) { + throw new UncheckedIOException(ex) + } + assert inputStream.read() == -1 // Make sure we are at the end of the stream. + def propertiesAfter = inputStream.getProperties() + propertiesAfter.getBlobType() == BlobType.BLOCK_BLOB + propertiesAfter.getBlobSize() == 6 * Constants.MB + byte[] randomBytes2 = outputStream.toByteArray() + assert randomBytes2 == randomBytes + + where: + chunkSize || numChunks | sizes + null || 2 | [4 * Constants.MB, 2 * Constants.MB] // Default + 5 * Constants.MB || 2 | [5 * Constants.MB, 1 * Constants.MB] // Greater than default + 3 * Constants.MB || 2 | [3 * Constants.MB, 3 * Constants.MB] // Smaller than default + } + // Only run this test in live mode as BlobOutputStream dynamically assigns blocks @Requires({ liveMode() }) def "Get properties before"() { From 3c149c6e0c5eb5087d5da729aff121cdc9c7877b Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Tue, 22 Sep 2020 16:32:34 -0700 Subject: [PATCH 2/4] Renamed chunkSize to blockSize --- sdk/storage/azure-storage-blob/CHANGELOG.md | 2 ++ .../storage/blob/options/BlobInputStreamOptions.java | 12 ++++++------ .../storage/blob/specialized/BlobClientBase.java | 2 +- .../blob/BlockBlobInputOutputStreamTest.groovy | 6 +++--- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/sdk/storage/azure-storage-blob/CHANGELOG.md b/sdk/storage/azure-storage-blob/CHANGELOG.md index 5d5ff722783da..ff7b58f347c6d 100644 --- a/sdk/storage/azure-storage-blob/CHANGELOG.md +++ b/sdk/storage/azure-storage-blob/CHANGELOG.md @@ -1,6 +1,8 @@ # Release History ## 12.9.0-beta.1 (Unreleased) +- Added support to specify block size when using BlobInputStream. + - Fixed a bug where users could not download more than 5000MB of data in one shot in the downloadToFile API. - Fixed a bug where the TokenCredential scope would be incorrect for custom URLs. - Fixed a bug where Default Azure Credential would not work with Azurite. diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobInputStreamOptions.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobInputStreamOptions.java index 1230aa04687ae..645b4376fa77b 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobInputStreamOptions.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobInputStreamOptions.java @@ -15,7 +15,7 @@ public class BlobInputStreamOptions { private BlobRange range; private BlobRequestConditions requestConditions; - private Integer chunkSize; + private Integer blockSize; /** * @return {@link BlobRange} @@ -54,18 +54,18 @@ public BlobInputStreamOptions setRequestConditions(BlobRequestConditions request * fewer network calls, but each individual call will send more data and will therefore take longer. * The default value is 4 MB. */ - public Integer getChunkSize() { - return chunkSize; + public Integer getBlockSize() { + return blockSize; } /** - * @param chunkSize The size of each data chunk returned from the service. If block size is large, input stream + * @param blockSize The size of each data chunk returned from the service. If block size is large, input stream * will make fewer network calls, but each individual call will send more data and will therefore take longer. * The default value is 4 MB. * @return The updated options. */ - public BlobInputStreamOptions setChunkSize(Integer chunkSize) { - this.chunkSize = chunkSize; + public BlobInputStreamOptions setBlockSize(Integer blockSize) { + this.blockSize = blockSize; return this; } } diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java index 41602c9ec97d5..da32a5c92ebe3 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java @@ -248,7 +248,7 @@ public final BlobInputStream openInputStream(BlobRange range, BlobRequestConditi public final BlobInputStream openInputStream(BlobInputStreamOptions options) { options = options == null ? new BlobInputStreamOptions() : options; BlobRange range = options.getRange() == null ? new BlobRange(0) : options.getRange(); - int chunkSize = options.getChunkSize() == null ? 4 * Constants.MB : options.getChunkSize(); + int chunkSize = options.getBlockSize() == null ? 4 * Constants.MB : options.getBlockSize(); return new BlobInputStream(client, range.getOffset(), range.getCount(), chunkSize, options.getRequestConditions(), getProperties()); diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlockBlobInputOutputStreamTest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlockBlobInputOutputStreamTest.groovy index e989a76b31bf4..e573854ae103c 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlockBlobInputOutputStreamTest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlockBlobInputOutputStreamTest.groovy @@ -74,7 +74,7 @@ class BlockBlobInputOutputStreamTest extends APISpec { // Only run this test in live mode as BlobOutputStream dynamically assigns blocks @Requires({ liveMode() }) @Unroll - def "Upload download chunk size"() { + def "Upload download block size"() { when: int length = 6 * Constants.MB byte[] randomBytes = getRandomByteArray(length) @@ -84,7 +84,7 @@ class BlockBlobInputOutputStreamTest extends APISpec { outStream.close() then: - def inputStream = bc.openInputStream(new BlobInputStreamOptions().setChunkSize(chunkSize)) + def inputStream = bc.openInputStream(new BlobInputStreamOptions().setBlockSize(blockSize)) int b def outputStream = new ByteArrayOutputStream() try { @@ -111,7 +111,7 @@ class BlockBlobInputOutputStreamTest extends APISpec { assert randomBytes2 == randomBytes where: - chunkSize || numChunks | sizes + blockSize || numChunks | sizes null || 2 | [4 * Constants.MB, 2 * Constants.MB] // Default 5 * Constants.MB || 2 | [5 * Constants.MB, 1 * Constants.MB] // Greater than default 3 * Constants.MB || 2 | [3 * Constants.MB, 3 * Constants.MB] // Smaller than default From 705c3bac367eb07c7346f9a49fba6a0a61e37913 Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Tue, 22 Sep 2020 16:47:00 -0700 Subject: [PATCH 3/4] Analyze step --- .../com/azure/storage/blob/specialized/BlobInputStream.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobInputStream.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobInputStream.java index ef5bfb3538ca3..bf66188ca9ec9 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobInputStream.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobInputStream.java @@ -3,14 +3,11 @@ package com.azure.storage.blob.specialized; import com.azure.core.util.FluxUtil; -import com.azure.storage.blob.BlobAsyncClient; -import com.azure.storage.blob.models.BlobDownloadHeaders; import com.azure.storage.blob.models.BlobProperties; import com.azure.storage.blob.models.BlobRange; import com.azure.storage.blob.models.BlobRequestConditions; import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.common.StorageInputStream; -import com.azure.storage.common.implementation.Constants; import java.io.IOException; import java.nio.ByteBuffer; @@ -32,7 +29,7 @@ public final class BlobInputStream extends StorageInputStream { /** * Holds the {@link BlobProperties} object that represents the blob's properties. */ - private BlobProperties properties; + private final BlobProperties properties; /** * Initializes a new instance of the BlobInputStream class. Note that if {@code blobRangeOffset} is not {@code 0} or From c4e626c91c13e5452490e53d381b1dfd064165e5 Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Tue, 22 Sep 2020 17:01:05 -0700 Subject: [PATCH 4/4] Made method non final --- .../java/com/azure/storage/blob/specialized/BlobClientBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java index da32a5c92ebe3..a2ab5e01c6600 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java @@ -245,7 +245,7 @@ public final BlobInputStream openInputStream(BlobRange range, BlobRequestConditi * @return An InputStream object that represents the stream to use for reading from the blob. * @throws BlobStorageException If a storage service error occurred. */ - public final BlobInputStream openInputStream(BlobInputStreamOptions options) { + public BlobInputStream openInputStream(BlobInputStreamOptions options) { options = options == null ? new BlobInputStreamOptions() : options; BlobRange range = options.getRange() == null ? new BlobRange(0) : options.getRange(); int chunkSize = options.getBlockSize() == null ? 4 * Constants.MB : options.getBlockSize();