From a094170a51aacfe35de8d0841f3f6eac8182197f Mon Sep 17 00:00:00 2001 From: Rabab Ibrahim <88855721+ibrahimrabab@users.noreply.github.com> Date: Fri, 23 Sep 2022 10:37:15 -0700 Subject: [PATCH] Adding new overload with length for BlobAsyncClient.upload() (#30672) --- .../options/BlobParallelUploadOptions.java | 2 +- .../BufferedUploadWithKnownLengthExample.java | 105 ++++++++++++++++++ .../blob/specialized/BlockBlobAPITest.groovy | 33 ++++++ 3 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 sdk/storage/azure-storage-blob/src/samples/java/com/azure/storage/blob/BufferedUploadWithKnownLengthExample.java diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobParallelUploadOptions.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobParallelUploadOptions.java index 8820963977f73..3d4527aa9d5a3 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobParallelUploadOptions.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobParallelUploadOptions.java @@ -105,7 +105,7 @@ public BlobParallelUploadOptions(BinaryData data) { StorageImplUtils.assertNotNull("data", data); this.dataFlux = Flux.just(data.toByteBuffer()); this.dataStream = null; - this.length = null; + this.length = data.getLength(); } /** diff --git a/sdk/storage/azure-storage-blob/src/samples/java/com/azure/storage/blob/BufferedUploadWithKnownLengthExample.java b/sdk/storage/azure-storage-blob/src/samples/java/com/azure/storage/blob/BufferedUploadWithKnownLengthExample.java new file mode 100644 index 0000000000000..a5eb0bf0c9195 --- /dev/null +++ b/sdk/storage/azure-storage-blob/src/samples/java/com/azure/storage/blob/BufferedUploadWithKnownLengthExample.java @@ -0,0 +1,105 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob; + +import com.azure.core.util.BinaryData; +import com.azure.storage.blob.models.ParallelTransferOptions; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import com.azure.storage.blob.specialized.BlockBlobAsyncClient; +import com.azure.storage.common.StorageSharedKeyCredential; +import reactor.core.publisher.Flux; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Locale; + +/** + * This example shows how to use the buffered upload method on BlockBlobAsyncClient with a known length. + * + * Note that the use of .block() in the method is only used to enable the sample to run effectively in isolation. It is + * not recommended for use in async environments. + */ +public class BufferedUploadWithKnownLengthExample { + /** + * Entry point into the basic examples for Storage blobs. + * @param args Unused. Arguments to the program. + * @throws IOException If an I/O error occurs + * @throws RuntimeException If the downloaded data doesn't match the uploaded data + */ + public static void main(String[] args) throws IOException { + + /* + * For more information on this setup, please refer to the BasicExample. + */ + String accountName = SampleHelper.getAccountName(); + String accountKey = SampleHelper.getAccountKey(); + StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey); + String endpoint = String.format(Locale.ROOT, "https://%s.blob.core.windows.net", accountName); + String containerName = "myjavacontainerbuffereduploadlength" + System.currentTimeMillis(); + BlobServiceAsyncClient storageClient = new BlobServiceClientBuilder().endpoint(endpoint).credential(credential) + .buildAsyncClient(); + + BlobContainerAsyncClient containerClient = storageClient.getBlobContainerAsyncClient(containerName); + containerClient.create().block(); + + uploadSourceBlob(endpoint, credential, containerName); + BlobAsyncClient blobClient = containerClient.getBlobAsyncClient("HelloWorld.txt"); + + + /* + sourceData has a network stream as its source and therefore likely does not support multiple subscribers. Even + if it did support multiple subscribers, it would not produce the same data each time it was subscribed to. While + we could inspect the http headers for the content-length, let us suppose that this information is unavailable + at this time. All three of these factors would individually make the use of the standard upload method + impossible--the first two because retries would not work and the third one because we could not satisfy the + argument list. + */ + Flux sourceData = getSourceBlobClient(endpoint, credential, containerName).downloadStream() + // Perform transformation with length of 1 GB. + .map(BufferedUploadWithKnownLengthExample::bufferTransformation); + + /* + Although this upload overload permits the use of such unreliable data sources, with known length we can speed + up the upload process. A buffer size and maximum concurrency can still be passed in to achieve optimized upload. + */ + long length = 10; + long blockSize = 10 * 1024; + int maxConcurrency = 5; + ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions() + .setBlockSizeLong(blockSize) + .setMaxConcurrency(maxConcurrency); + + // Since we already know the size of our buffered bytes, we can pass the ByteBuffer and length to the BinaryData. + // This will internally convert the BinaryData to a Flux, but with known length we can optimize the + // upload speed. + // Need to use BinaryData.fromFlux(Flux, Long, Boolean) with bufferContent set to false, this allows + // us to configure the BinaryData to have a specified length set without the BinaryData being infinitely + // subscribed to the Flux. + BinaryData.fromFlux(sourceData, length, false).flatMap(binaryData -> + blobClient.uploadWithResponse(new BlobParallelUploadOptions(binaryData) + .setParallelTransferOptions(parallelTransferOptions))); + } + + @SuppressWarnings("cast") + private static ByteBuffer bufferTransformation(ByteBuffer buffer) { + // The JDK changed the return type of ByteBuffer#limit between 8 and 9. In 8 and below it returns Buffer, whereas + // in JDK 9 and later, it returns ByteBuffer. To compile on both, we explicitly cast the returned value to + // ByteBuffer. + // See https://bugs-stage.openjdk.java.net/browse/JDK-8062376 + int length = 10; + return (ByteBuffer) buffer.limit(length); + } + + private static void uploadSourceBlob(String endpoint, StorageSharedKeyCredential credential, String containerName) { + getSourceBlobClient(endpoint, credential, containerName) + .upload(Flux.just(ByteBuffer.wrap("Hello world".getBytes(Charset.defaultCharset()))), "Hello world".length()).block(); + } + + private static BlockBlobAsyncClient getSourceBlobClient(String endpoint, StorageSharedKeyCredential credential, + String containerName) { + return new BlobServiceClientBuilder().endpoint(endpoint).credential(credential).buildAsyncClient() + .getBlobContainerAsyncClient(containerName).getBlobAsyncClient("sourceBlob").getBlockBlobAsyncClient(); + } +} diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy index 640f94fc76ce5..b7bc6f61e34e9 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy @@ -2042,6 +2042,28 @@ class BlockBlobAPITest extends APISpec { 100 | 50 | 20 || 5 // Test that blockSize is respected } + @Unroll + @LiveOnly + def "Buffered upload with length"() { + setup: + def data = Flux.just(getRandomData(dataSize)) + def binaryData = BinaryData.fromFlux(data, dataSize).block() + def parallelUploadOptions = new BlobParallelUploadOptions(binaryData) + .setParallelTransferOptions(new ParallelTransferOptions().setBlockSizeLong(blockSize).setMaxSingleUploadSizeLong(singleUploadSize)) + + when: + blobAsyncClient.uploadWithResponse(parallelUploadOptions).block() + + then: + blobAsyncClient.getBlockBlobAsyncClient() + .listBlocks(BlockListType.COMMITTED).block().getCommittedBlocks().size() == expectedBlockCount + + where: + dataSize | singleUploadSize | blockSize || expectedBlockCount + 100 | 100 | null || 0 // Test that singleUploadSize is respected + 100 | 50 | 20 || 5 // Test that blockSize is respected + } + // Only run these tests in live mode as they use variables that can't be captured. @Unroll @LiveOnly @@ -2250,6 +2272,17 @@ class BlockBlobAPITest extends APISpec { smallFile.delete() } + @LiveOnly + def "Buffered upload with specified length"() { + setup: + def fluxData = Flux.just(getRandomData(data.getDefaultDataSizeLong() as int)) + def binaryData = BinaryData.fromFlux(fluxData, data.getDefaultDataSizeLong()).block() + def parallelUploadOptions = new BlobParallelUploadOptions(binaryData) + expect: + StepVerifier.create(blobAsyncClient.uploadWithResponse(parallelUploadOptions)) + .assertNext({ assert it.getValue().getETag() != null }).verifyComplete() + } + @LiveOnly def "Buffered upload overwrite"() { when: