Skip to content

Commit

Permalink
[Storage] Use FluxUtil for reliable download. (#22080)
Browse files Browse the repository at this point in the history
* first draft.

* indent.

* npes.

* timeout.

* fix empty case.

* checkstyle

* tests
  • Loading branch information
kasobol-msft authored Jun 4, 2021
1 parent f6094de commit ad00d73
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 198 deletions.
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob-batch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob-changefeed/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob-cryptography/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob-nio/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.core.http.RequestConditions;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.http.rest.StreamResponse;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
Expand All @@ -22,12 +23,12 @@
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceVersion;
import com.azure.storage.blob.HttpGetterInfo;
import com.azure.storage.blob.ProgressReporter;
import com.azure.storage.blob.implementation.AzureBlobStorageImpl;
import com.azure.storage.blob.implementation.AzureBlobStorageImplBuilder;
import com.azure.storage.blob.implementation.models.BlobTag;
import com.azure.storage.blob.implementation.models.BlobTags;
import com.azure.storage.blob.implementation.models.BlobsDownloadHeaders;
import com.azure.storage.blob.implementation.models.BlobsGetAccountInfoHeaders;
import com.azure.storage.blob.implementation.models.BlobsGetPropertiesHeaders;
import com.azure.storage.blob.implementation.models.BlobsStartCopyFromURLHeaders;
Expand Down Expand Up @@ -99,6 +100,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -120,6 +122,7 @@
public class BlobAsyncClientBase {

private final ClientLogger logger = new ClientLogger(BlobAsyncClientBase.class);
private static final Duration TIMEOUT_VALUE = Duration.ofSeconds(60);

protected final AzureBlobStorageImpl azureBlobStorage;
private final String snapshot;
Expand Down Expand Up @@ -1002,32 +1005,78 @@ public Mono<BlobDownloadContentAsyncResponse> downloadContentWithResponse(

Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options,
BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) {
return downloadHelper(range, options, requestConditions, getRangeContentMd5, context)
.map(response -> new BlobDownloadAsyncResponse(response.getRequest(), response.getStatusCode(),
response.getHeaders(), response.getValue(), response.getDeserializedHeaders()));
}

private Mono<ReliableDownload> downloadHelper(BlobRange range, DownloadRetryOptions options,
BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) {
range = range == null ? new BlobRange(0) : range;
BlobRange finalRange = range == null ? new BlobRange(0) : range;
Boolean getMD5 = getRangeContentMd5 ? getRangeContentMd5 : null;
requestConditions = requestConditions == null ? new BlobRequestConditions() : requestConditions;
HttpGetterInfo info = new HttpGetterInfo()
.setOffset(range.getOffset())
.setCount(range.getCount())
.setETag(requestConditions.getIfMatch());
BlobRequestConditions finalRequestConditions =
requestConditions == null ? new BlobRequestConditions() : requestConditions;
DownloadRetryOptions finalOptions = (options == null) ? new DownloadRetryOptions() : options;

return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), getMD5, context)
.map(response -> {
String eTag = ModelHelper.getETag(response.getHeaders());
BlobsDownloadHeaders blobsDownloadHeaders =
ModelHelper.transformBlobDownloadHeaders(response.getHeaders());
BlobDownloadHeaders blobDownloadHeaders = ModelHelper.populateBlobDownloadHeaders(
blobsDownloadHeaders, ModelHelper.getErrorCode(response.getHeaders()));

/*
If the customer did not specify a count, they are reading to the end of the blob. Extract this value
from the response for better book keeping towards the end.
*/
long finalCount;
if (finalRange.getCount() == null) {
long blobLength = BlobAsyncClientBase.getBlobLength(blobDownloadHeaders);
finalCount = blobLength - finalRange.getOffset();
} else {
finalCount = finalRange.getCount();
}

Flux<ByteBuffer> bufferFlux = FluxUtil.createRetriableDownloadFlux(
() -> response.getValue().timeout(TIMEOUT_VALUE),
(throwable, offset) -> {
if (!(throwable instanceof IOException || throwable instanceof TimeoutException)) {
return Flux.error(throwable);
}

long newCount = finalCount - (offset - finalRange.getOffset());

/*
It is possible that the network stream will throw an error after emitting all data but before
completing. Issuing a retry at this stage would leave the download in a bad state with incorrect count
and offset values. Because we have read the intended amount of data, we can ignore the error at the end
of the stream.
*/
if (newCount == 0) {
logger.warning("Exception encountered in ReliableDownload after all data read from the network but "
+ "but before stream signaled completion. Returning success as all data was downloaded. "
+ "Exception message: " + throwable.getMessage());
return Flux.empty();
}

try {
return downloadRange(
new BlobRange(offset, newCount), finalRequestConditions, eTag, getMD5, context)
.flatMapMany(r -> r.getValue().timeout(TIMEOUT_VALUE));
} catch (Exception e) {
return Flux.error(e);
}
},
finalOptions.getMaxRetryRequests(),
finalRange.getOffset()
).switchIfEmpty(Flux.just(ByteBuffer.wrap(new byte[0])));

return new BlobDownloadAsyncResponse(response.getRequest(), response.getStatusCode(),
response.getHeaders(), bufferFlux, blobDownloadHeaders);
});
}

private Mono<StreamResponse> downloadRange(BlobRange range, BlobRequestConditions requestConditions,
String eTag, Boolean getMD5, Context context) {
return azureBlobStorage.getBlobs().downloadWithResponseAsync(containerName, blobName, snapshot, versionId, null,
range.toHeaderValue(), requestConditions.getLeaseId(), getMD5, null, requestConditions.getIfModifiedSince(),
requestConditions.getIfUnmodifiedSince(), requestConditions.getIfMatch(),
requestConditions.getIfUnmodifiedSince(), eTag,
requestConditions.getIfNoneMatch(), requestConditions.getTagsConditions(), null,
customerProvidedKey, context)
.map(response -> {
info.setETag(ModelHelper.getETag(response.getHeaders()));
return new ReliableDownload(response, options, info, updatedInfo ->
downloadHelper(new BlobRange(updatedInfo.getOffset(), updatedInfo.getCount()), options,
new BlobRequestConditions().setIfMatch(info.getETag()), false, context));
});
customerProvidedKey, context);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* This class automatically retries failed reads from a blob download stream.
Expand All @@ -30,7 +32,9 @@
* will be resumed from the point where the download failed. This allows for the download to be consumed as one
* continuous stream.
* </p>
* @deprecated use {@link com.azure.core.util.FluxUtil#createRetriableDownloadFlux(Supplier, BiFunction, int)} instead.
*/
@Deprecated
final class ReliableDownload {
private final ClientLogger logger = new ClientLogger(ReliableDownload.class);

Expand Down
Loading

0 comments on commit ad00d73

Please sign in to comment.