Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to specify block size in BlobInputStream #15496

Merged
merged 4 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/storage/azure-storage-blob/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Release History

## 12.9.0-beta.1 (Unreleased)
- Added support to specify block size when using BlobInputStream.

- Fixed a bug where users could not download more than 5000MB of data in one shot in the downloadToFile API.
- Fixed a bug where the TokenCredential scope would be incorrect for custom URLs.
- Fixed a bug where Default Azure Credential would not work with Azurite.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.blob.options;

import com.azure.core.annotation.Fluent;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;

/**
* Extended options that may be passed when opening a blob input stream.
*/
@Fluent
public class BlobInputStreamOptions {

private BlobRange range;
private BlobRequestConditions requestConditions;
private Integer blockSize;

/**
* @return {@link BlobRange}
*/
public BlobRange getRange() {
return range;
}

/**
* @param range {@link BlobRange}
* @return The updated options.
*/
public BlobInputStreamOptions setRange(BlobRange range) {
this.range = range;
return this;
}

/**
* @return {@link BlobRequestConditions}
*/
public BlobRequestConditions getRequestConditions() {
return requestConditions;
}

/**
* @param requestConditions {@link BlobRequestConditions}
* @return The updated options.
*/
public BlobInputStreamOptions setRequestConditions(BlobRequestConditions requestConditions) {
this.requestConditions = requestConditions;
return this;
}

/**
* @return The size of each data chunk returned from the service. If block size is large, input stream will make
* fewer network calls, but each individual call will send more data and will therefore take longer.
* The default value is 4 MB.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to say the default if 4 MB should the getter check for blockSize == null and return 4 * Constants.MB?

return (blockSize == null) ? 4 * Constants.MB : blockSize;

If this change is made, we could also change the return type to int instead of Integer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we tie this type to promising that? I'm a little hesitant to do that. I'd prefer it just be normal fluent behavior

*/
public Integer getBlockSize() {
return blockSize;
}

/**
* @param blockSize The size of each data chunk returned from the service. If block size is large, input stream
* will make fewer network calls, but each individual call will send more data and will therefore take longer.
* The default value is 4 MB.
* @return The updated options.
*/
public BlobInputStreamOptions setBlockSize(Integer blockSize) {
this.blockSize = blockSize;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.azure.storage.blob.models.BlobQueryAsyncResponse;
import com.azure.storage.blob.options.BlobDownloadToFileOptions;
import com.azure.storage.blob.options.BlobGetTagsOptions;
import com.azure.storage.blob.options.BlobInputStreamOptions;
import com.azure.storage.blob.options.BlobQueryOptions;
import com.azure.storage.blob.models.BlobQueryResponse;
import com.azure.storage.blob.models.BlobRange;
Expand All @@ -41,6 +42,7 @@
import com.azure.storage.blob.options.BlobSetTagsOptions;
import com.azure.storage.blob.sas.BlobServiceSasSignatureValues;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.FluxInputStream;
import com.azure.storage.common.implementation.StorageImplUtils;
import reactor.core.Exceptions;
Expand Down Expand Up @@ -219,7 +221,7 @@ public boolean isSnapshot() {
* @throws BlobStorageException If a storage service error occurred.
*/
public final BlobInputStream openInputStream() {
return openInputStream(new BlobRange(0), null);
return openInputStream(null, null);
}

/**
Expand All @@ -233,7 +235,23 @@ public final BlobInputStream openInputStream() {
* @throws BlobStorageException If a storage service error occurred.
*/
public final BlobInputStream openInputStream(BlobRange range, BlobRequestConditions requestConditions) {
return new BlobInputStream(client, range.getOffset(), range.getCount(), requestConditions);
return openInputStream(new BlobInputStreamOptions().setRange(range).setRequestConditions(requestConditions));
}

/**
* Opens a blob input stream to download the specified range of the blob.
*
* @param options {@link BlobInputStreamOptions}
* @return An <code>InputStream</code> object that represents the stream to use for reading from the blob.
* @throws BlobStorageException If a storage service error occurred.
*/
public BlobInputStream openInputStream(BlobInputStreamOptions options) {
options = options == null ? new BlobInputStreamOptions() : options;
BlobRange range = options.getRange() == null ? new BlobRange(0) : options.getRange();
int chunkSize = options.getBlockSize() == null ? 4 * Constants.MB : options.getBlockSize();

return new BlobInputStream(client, range.getOffset(), range.getCount(), chunkSize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is a minor change where we will make an additional service call to get the properties of the blob before opening the stream. Are we okay with that? If so, we should add a quick document stating that we will be doing this as it is a behavioral change from the past.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think we already called getProperties before, I just moved it outside since we can't call it in the constructor before a call to super

options.getRequestConditions(), getProperties());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@
package com.azure.storage.blob.specialized;

import com.azure.core.util.FluxUtil;
import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.models.BlobDownloadHeaders;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.common.StorageInputStream;
import com.azure.storage.common.implementation.Constants;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -32,20 +29,7 @@ public final class BlobInputStream extends StorageInputStream {
/**
* Holds the {@link BlobProperties} object that represents the blob's properties.
*/
private BlobProperties properties;

/**
* Initializes a new instance of the BlobInputStream class.
*
* @param blobClient A {@link BlobAsyncClient} object which represents the blob that this stream is associated with.
* @param accessCondition An {@link BlobRequestConditions} object which represents the access conditions for the
* blob.
* @throws BlobStorageException An exception representing any error which occurred during the operation.
*/
BlobInputStream(final BlobAsyncClient blobClient, final BlobRequestConditions accessCondition)
throws BlobStorageException {
this(blobClient, 0, null, accessCondition);
}
private final BlobProperties properties;

/**
* Initializes a new instance of the BlobInputStream class. Note that if {@code blobRangeOffset} is not {@code 0} or
Expand All @@ -55,19 +39,19 @@ public final class BlobInputStream extends StorageInputStream {
* with.
* @param blobRangeOffset The offset of blob data to begin stream.
* @param blobRangeLength How much data the stream should return after blobRangeOffset.
* @param chunkSize The size of the chunk to download.
* @param accessCondition An {@link BlobRequestConditions} object which represents the access conditions for the
* blob.
* @throws BlobStorageException An exception representing any error which occurred during the operation.
*/
BlobInputStream(final BlobAsyncClientBase blobClient, long blobRangeOffset, Long blobRangeLength,
final BlobRequestConditions accessCondition)
BlobInputStream(final BlobAsyncClientBase blobClient, long blobRangeOffset, Long blobRangeLength, int chunkSize,
final BlobRequestConditions accessCondition, final BlobProperties blobProperties)
throws BlobStorageException {
super(blobRangeOffset, blobRangeLength, 4 * Constants.MB,
blobClient.getProperties().block().getBlobSize());
super(blobRangeOffset, blobRangeLength, chunkSize, blobProperties.getBlobSize());

this.blobClient = blobClient;
this.accessCondition = accessCondition;
this.properties = null;
this.properties = blobProperties;
}

/**
Expand All @@ -83,11 +67,6 @@ protected synchronized ByteBuffer dispatchRead(final int readLength, final long
ByteBuffer currentBuffer = this.blobClient.downloadWithResponse(new BlobRange(offset,
(long) readLength), null, this.accessCondition, false)
.flatMap(response -> {
// Only populate properties if it has not been populated yet, this is ok since we etag lock on the
// blob while downloading, so it is guaranteed to be the same.
if (this.properties == null) {
this.properties = buildBlobProperties(response.getDeserializedHeaders());
}
return FluxUtil.collectBytesInByteBufferStream(response.getValue()).map(ByteBuffer::wrap);
})
.block();
Expand All @@ -102,21 +81,6 @@ protected synchronized ByteBuffer dispatchRead(final int readLength, final long
}
}

private static BlobProperties buildBlobProperties(BlobDownloadHeaders hd) {
if (hd == null) {
return null;
}
return new BlobProperties(null, hd.getLastModified(), hd.getETag(),
hd.getContentLength() == null ? 0 : hd.getContentLength(), hd.getContentType(), null,
hd.getContentEncoding(), hd.getContentDisposition(), hd.getContentLanguage(), hd.getCacheControl(),
hd.getBlobSequenceNumber(), hd.getBlobType(), hd.getLeaseStatus(), hd.getLeaseState(),
hd.getLeaseDuration(), hd.getCopyId(), hd.getCopyStatus(), hd.getCopySource(), hd.getCopyProgress(),
hd.getCopyCompletionTime(), hd.getCopyStatusDescription(), hd.isServerEncrypted(),
null, null, null, null, null, hd.getEncryptionKeySha256(), hd.getEncryptionScope(), null, hd.getMetadata(),
hd.getBlobCommittedBlockCount(), hd.getTagCount(), hd.getVersionId(), null,
hd.getObjectReplicationSourcePolicies(), hd.getObjectReplicationDestinationPolicyId());
}

/**
* Gets the blob properties.
* <p>
Expand All @@ -126,9 +90,6 @@ private static BlobProperties buildBlobProperties(BlobDownloadHeaders hd) {
* @return {@link BlobProperties}
*/
public BlobProperties getProperties() {
if (this.properties == null) {
this.properties = blobClient.getPropertiesWithResponse(accessCondition).block().getValue();
}
return this.properties;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.azure.storage.blob

import com.azure.storage.blob.models.BlobType
import com.azure.storage.blob.options.BlobInputStreamOptions
import com.azure.storage.blob.specialized.BlobOutputStream
import com.azure.storage.blob.specialized.BlockBlobClient
import com.azure.storage.common.implementation.Constants
Expand Down Expand Up @@ -65,11 +66,57 @@ class BlockBlobInputOutputStreamTest extends APISpec {
}
def propertiesAfter = inputStream.getProperties()
propertiesAfter.getBlobType() == BlobType.BLOCK_BLOB
propertiesAfter.getBlobSize() == 4 * Constants.MB
propertiesAfter.getBlobSize() == 5 * Constants.MB
byte[] randomBytes2 = outputStream.toByteArray()
assert randomBytes2 == Arrays.copyOfRange(randomBytes, 1 * Constants.MB, 6 * Constants.MB)
}

// Only run this test in live mode as BlobOutputStream dynamically assigns blocks
@Requires({ liveMode() })
@Unroll
def "Upload download block size"() {
when:
int length = 6 * Constants.MB
byte[] randomBytes = getRandomByteArray(length)

BlobOutputStream outStream = bc.getBlobOutputStream()
outStream.write(randomBytes, 0, 6 * Constants.MB)
outStream.close()

then:
def inputStream = bc.openInputStream(new BlobInputStreamOptions().setBlockSize(blockSize))
int b
def outputStream = new ByteArrayOutputStream()
try {
for (int i = 0; i < numChunks; i++) {
b = inputStream.read()
assert b != -1
outputStream.write(b)
assert inputStream.available() == sizes[i] - 1 // Make sure the internal buffer is the expected chunk size.
// Read the rest of the chunk
for (int j = 0; j < sizes[i] - 1; j++) {
b = inputStream.read()
assert b != -1
outputStream.write(b)
}
}
} catch (IOException ex) {
throw new UncheckedIOException(ex)
}
assert inputStream.read() == -1 // Make sure we are at the end of the stream.
def propertiesAfter = inputStream.getProperties()
propertiesAfter.getBlobType() == BlobType.BLOCK_BLOB
propertiesAfter.getBlobSize() == 6 * Constants.MB
byte[] randomBytes2 = outputStream.toByteArray()
assert randomBytes2 == randomBytes

where:
blockSize || numChunks | sizes
null || 2 | [4 * Constants.MB, 2 * Constants.MB] // Default
5 * Constants.MB || 2 | [5 * Constants.MB, 1 * Constants.MB] // Greater than default
3 * Constants.MB || 2 | [3 * Constants.MB, 3 * Constants.MB] // Smaller than default
}

// Only run this test in live mode as BlobOutputStream dynamically assigns blocks
@Requires({ liveMode() })
def "Get properties before"() {
Expand Down