Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage][DataMovement] Refactor download transfers #47231

Merged
merged 14 commits into from
Nov 25, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public static HttpRange ToHttpRange(ContentRange contentRange)
{
// Because constructing HttpRange is the start value, and the length of the range
// increment 1 on the end value, since the end value is the end index (not the length).
return new HttpRange(contentRange.Start.Value, contentRange.End.Value + 1);
long length = contentRange.End.Value - contentRange.Start.Value + 1;
return new HttpRange(contentRange.Start.Value, length);
}
return default;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Tests:
Arguments: &sizes
- --size 1024 --count 5000 --duration 60 --concurrency 64
- --size 10485760 --count 500 --duration 90 --concurrency 64
- --size 52428800 --count 200 --duration 120 --concurrency 64
- --size 1073741824 --count 5 --duration 120 --concurrency 64

- Test: upload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public partial class StorageResourceWriteToOffsetOptions
{
public StorageResourceWriteToOffsetOptions() { }
public string BlockId { get { throw null; } set { } }
public bool Initial { get { throw null; } set { } }
public long? Position { get { throw null; } set { } }
public Azure.Storage.DataMovement.StorageResourceItemProperties SourceProperties { get { throw null; } set { } }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public partial class StorageResourceWriteToOffsetOptions
{
public StorageResourceWriteToOffsetOptions() { }
public string BlockId { get { throw null; } set { } }
public bool Initial { get { throw null; } set { } }
public long? Position { get { throw null; } set { } }
public Azure.Storage.DataMovement.StorageResourceItemProperties SourceProperties { get { throw null; } set { } }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public partial class StorageResourceWriteToOffsetOptions
{
public StorageResourceWriteToOffsetOptions() { }
public string BlockId { get { throw null; } set { } }
public bool Initial { get { throw null; } set { } }
public long? Position { get { throw null; } set { } }
public Azure.Storage.DataMovement.StorageResourceItemProperties SourceProperties { get { throw null; } set { } }
}
Expand Down
246 changes: 28 additions & 218 deletions sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,17 @@ protected internal override async Task CopyFromStreamAsync(
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);

long position = options?.Position != default ? options.Position.Value : 0;
if (position == 0)
if (options?.Initial == true)
{
Create(overwrite);
}
if (streamLength > 0)
{
// Appends incoming stream to the local file resource
using (FileStream fileStream = new FileStream(
_uri.LocalPath,
FileMode.OpenOrCreate,
FileAccess.Write))
_uri.LocalPath,
FileMode.Open,
FileAccess.Write))
{
if (position > 0)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.IO;

namespace Azure.Storage.DataMovement
{
internal class QueueDownloadChunkArgs
{
public long Offset { get; }
public long Length { get; }
public Stream Content { get; }

public QueueDownloadChunkArgs(
long offset,
long length,
Stream content)
{
Offset = offset;
Length = length;
Content = content;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ public class StorageResourceWriteToOffsetOptions
/// </summary>
public long? Position { get; set; }

/// <summary>
/// Optional. Specifies whether this write is for the initial chunk.
/// </summary>
public bool Initial { get; set; }

/// <summary>
/// Optional. Specifies the source properties to set in the destination.
/// </summary>
Expand Down
118 changes: 38 additions & 80 deletions sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ public override async Task ProcessPartToChunkAsync()
internal async Task UnknownDownloadInternal()
{
Task<StorageResourceReadStreamResult> initialTask = _sourceResource.ReadStreamAsync(
position: 0,
length: _initialTransferSize,
_cancellationToken);
position: 0,
length: _initialTransferSize,
_cancellationToken);

try
{
Expand Down Expand Up @@ -254,7 +254,8 @@ internal async Task UnknownDownloadInternal()
offset: 0,
sourceLength: initialLength.Value,
source: initialResult.Content,
expectedLength: totalLength).ConfigureAwait(false);
expectedLength: totalLength,
initial: true).ConfigureAwait(false);
if (successfulInitialCopy)
{
ReportBytesWritten(initialLength.Value);
Expand Down Expand Up @@ -306,7 +307,8 @@ internal async Task LengthKnownDownloadInternal()
offset: 0,
sourceLength: downloadLength,
source: result.Content,
expectedLength: totalLength).ConfigureAwait(false);
expectedLength: totalLength,
initial: true).ConfigureAwait(false);
if (successfulCopy)
{
ReportBytesWritten(downloadLength);
Expand All @@ -324,20 +326,17 @@ internal async Task LengthKnownDownloadInternal()
#region PartitionedDownloader
private async Task QueueChunksToChannel(long initialLength, long totalLength)
{
// Get list of ranges of the blob
IList<HttpRange> ranges = GetRangesList(initialLength, totalLength, _transferChunkSize);

// Create Download Chunk event handler to manage when the ranges finish downloading
_downloadChunkHandler = GetDownloadChunkHandler(
_downloadChunkHandler = new DownloadChunkHandler(
currentTransferred: initialLength,
expectedLength: totalLength,
ranges: ranges,
jobPart: this);
GetDownloadChunkHandlerBehaviors(this),
_cancellationToken);

// Fill the queue with tasks to download each of the remaining
// ranges in the file
_queueingTasks = true;
foreach (HttpRange httpRange in ranges)
foreach (HttpRange httpRange in GetRanges(initialLength, totalLength, _transferChunkSize))
{
if (_cancellationToken.IsCancellationRequested)
{
Expand Down Expand Up @@ -394,48 +393,35 @@ internal async Task DownloadStreamingInternal(HttpRange range)
(long)range.Length,
_cancellationToken).ConfigureAwait(false);

// The chunk handler may have been disposed in failure case
if (_downloadChunkHandler != null)
// Stream the data from the network before queueing disk IO.
MemoryStream content = new((int)result.ContentLength.Value);
using (Stream dataStream = result.Content)
{
await _downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs(
transferId: _dataTransfer.Id,
success: true,
offset: range.Offset,
bytesTransferred: (long)range.Length,
result: result.Content,
exception: default,
false,
_cancellationToken)).ConfigureAwait(false);
await dataStream.CopyToAsync(
content,
DataMovementConstants.DefaultStreamCopyBufferSize,
_cancellationToken).ConfigureAwait(false);
}
content.Position = 0;

// The chunk handler may have been disposed in failure case
_downloadChunkHandler?.QueueChunk(new QueueDownloadChunkArgs(
offset: range.Offset,
length: (long)range.Length,
content: content));
}
catch (Exception ex)
{
if (_downloadChunkHandler != null)
{
await _downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs(
transferId: _dataTransfer.Id,
success: false,
offset: range.Offset,
bytesTransferred: (long)range.Length,
result: default,
exception: ex,
false,
_cancellationToken)).ConfigureAwait(false);
}
else
{
// If the _downloadChunkHandler has been disposed before we call to it
// we should at least filter the exception to error handling just in case.
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}

public async Task<bool> CopyToStreamInternal(
long offset,
long sourceLength,
Stream source,
long expectedLength)
long expectedLength,
bool initial)
{
CancellationHelper.ThrowIfCancellationRequested(_cancellationToken);

Expand All @@ -450,6 +436,7 @@ await _destinationResource.CopyFromStreamAsync(
options: new StorageResourceWriteToOffsetOptions()
{
Position = offset,
Initial = initial,
},
cancellationToken: _cancellationToken).ConfigureAwait(false);
return true;
Expand All @@ -464,42 +451,14 @@ await _destinationResource.CopyFromStreamAsync(
return false;
}

public async Task WriteChunkToTempFile(string chunkFilePath, Stream source)
{
CancellationHelper.ThrowIfCancellationRequested(_cancellationToken);

using (FileStream fileStream = File.OpenWrite(chunkFilePath))
{
await source.CopyToAsync(
fileStream,
DataMovementConstants.DefaultStreamCopyBufferSize,
_cancellationToken)
.ConfigureAwait(false);
}
}

internal DownloadChunkHandler GetDownloadChunkHandler(
long currentTransferred,
long expectedLength,
IList<HttpRange> ranges,
UriToStreamJobPart jobPart)
=> new DownloadChunkHandler(
currentTransferred,
expectedLength,
ranges,
GetDownloadChunkHandlerBehaviors(jobPart),
ClientDiagnostics,
_cancellationToken);

internal static DownloadChunkHandler.Behaviors GetDownloadChunkHandlerBehaviors(UriToStreamJobPart job)
private static DownloadChunkHandler.Behaviors GetDownloadChunkHandlerBehaviors(UriToStreamJobPart jobPart)
{
return new DownloadChunkHandler.Behaviors()
{
CopyToDestinationFile = job.CopyToStreamInternal,
CopyToChunkFile = job.WriteChunkToTempFile,
ReportProgressInBytes = job.ReportBytesWritten,
InvokeFailedHandler = job.InvokeFailedArgAsync,
QueueCompleteFileDownload = job.QueueCompleteFileDownload
CopyToDestinationFile = jobPart.CopyToStreamInternal,
ReportProgressInBytes = jobPart.ReportBytesWritten,
InvokeFailedHandler = jobPart.InvokeFailedArgAsync,
QueueCompleteFileDownload = jobPart.QueueCompleteFileDownload
};
}

Expand All @@ -508,14 +467,12 @@ private Task QueueCompleteFileDownload()
return QueueChunkToChannelAsync(CompleteFileDownload);
}

private static IList<HttpRange> GetRangesList(long initialLength, long totalLength, long rangeSize)
private static IEnumerable<HttpRange> GetRanges(long initialLength, long totalLength, long rangeSize)
{
IList<HttpRange> list = new List<HttpRange>();
for (long offset = initialLength; offset < totalLength; offset += rangeSize)
{
list.Add(new HttpRange(offset, Math.Min(totalLength - offset, rangeSize)));
yield return new HttpRange(offset, Math.Min(totalLength - offset, rangeSize));
}
return list;
}
#endregion PartitionedDownloader

Expand Down Expand Up @@ -547,7 +504,8 @@ private async Task CreateZeroLengthDownload()
offset: 0,
sourceLength: 0,
source: default,
expectedLength: 0).ConfigureAwait(false);
expectedLength: 0,
initial: true).ConfigureAwait(false);
if (successfulCreation)
{
// Queue the work to end the download
Expand Down
Loading
Loading