From 8134f1272c2708e230cb49d68e747d6a7c5ad077 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Wed, 13 Nov 2024 14:20:29 -0800 Subject: [PATCH 01/12] Refactor downloads to use memory instead of disk for early chunks --- .../BlockBlobDirectoryToDirectoryTests.cs | 1 + .../BlockBlobStartTransferDownloadTests.cs | 50 +++++++++ .../src/DownloadChunkHandler.cs | 102 +++++------------- .../src/DownloadRangeEventArgs.cs | 5 +- .../tests/DownloadChunkHandlerTests.cs | 64 +---------- 5 files changed, 81 insertions(+), 141 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobDirectoryToDirectoryTests.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobDirectoryToDirectoryTests.cs index a1771d263f8b3..56d97860365ff 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobDirectoryToDirectoryTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobDirectoryToDirectoryTests.cs @@ -11,6 +11,7 @@ using BaseBlobs::Azure.Storage.Blobs.Models; using BaseBlobs::Azure.Storage.Blobs.Specialized; using DMBlobs::Azure.Storage.DataMovement.Blobs; +using NUnit.Framework; namespace Azure.Storage.DataMovement.Blobs.Tests { diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobStartTransferDownloadTests.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobStartTransferDownloadTests.cs index 0e4e4310aeb00..ab5cbef8e783a 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobStartTransferDownloadTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobStartTransferDownloadTests.cs @@ -13,6 +13,10 @@ using Azure.Storage.Test.Shared; using Azure.Core; using Azure.Core.TestFramework; +using NUnit.Framework; +using BaseBlobs::Azure.Storage.Blobs.Models; +using System.Threading; +using System.Linq; namespace Azure.Storage.DataMovement.Blobs.Tests { @@ -100,5 +104,51 @@ public BlobClientOptions GetOptions() return InstrumentClientOptions(options); } + + [Test] + public async Task DownloadTransferTest() + { + BlobServiceClient service = ClientBuilder.GetServiceClient_OAuth(TestEnvironment.Credential); + BlobContainerClient container = service.GetBlobContainerClient("test-download-1"); + using DisposingLocalDirectory testDirectory = DisposingLocalDirectory.GetTestDirectory(); + + //await container.CreateIfNotExistsAsync(); + //Random random = new(); + //foreach (int i in Enumerable.Range(0, 5)) + //{ + // byte[] data = new byte[1048576]; + // random.NextBytes(data); + // await container.UploadBlobAsync($"blob{i}", new BinaryData(data)); + //} + + BlobsStorageResourceProvider blobProvider = new(); + LocalFilesStorageResourceProvider localProvider = new(); + + TransferManager transferManager = new(); + DataTransferOptions options = new() + { + InitialTransferSize = 4096, + MaximumTransferChunkSize = 4096, + }; + TestEventsRaised testEvents = new(options); + DataTransfer transfer = await transferManager.StartTransferAsync( + blobProvider.FromClient(container), + localProvider.FromDirectory(testDirectory.DirectoryPath), + options); + + CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + await transfer.WaitForCompletionAsync(cts.Token); + testEvents.AssertUnexpectedFailureCheck(); + + await foreach (BlobItem blob in container.GetBlobsAsync()) + { + string localPath = Path.Combine(testDirectory.DirectoryPath, blob.Name); + var response = await container.GetBlobClient(blob.Name).DownloadContentAsync(); + byte[] expected = response.Value.Content.ToArray(); + byte[] actual = File.ReadAllBytes(localPath); + + Assert.That(actual, Is.EqualTo(expected)); + } + } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs index 7e7a7164ff15f..72f18c1874745 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs @@ -62,7 +62,6 @@ public struct Behaviors /// List that holds all ranges of chunks to process. /// private readonly IList _ranges; - private int _rangesCount; /// /// Holds which range we are currently waiting on to download. /// @@ -72,7 +71,7 @@ public struct Behaviors /// If any download chunks come in early before the chunk before it /// to copy to the file, let's hold it in order here before we copy it over. /// - private ConcurrentDictionary _rangesCompleted; + private Dictionary _pendingChunks; internal ClientDiagnostics ClientDiagnostics { get; } @@ -127,6 +126,7 @@ public DownloadChunkHandler( _expectedLength = expectedLength; _ranges = ranges; + _pendingChunks = new(); if (expectedLength <= 0) { @@ -148,10 +148,6 @@ public DownloadChunkHandler( _queueCompleteFileDownload = behaviors.QueueCompleteFileDownload ?? throw Errors.ArgumentNull(nameof(behaviors.QueueCompleteFileDownload)); - _rangesCount = ranges.Count; - // Set size of the list of null streams - _rangesCompleted = new ConcurrentDictionary(); - _downloadChunkEventHandler += DownloadChunkEvent; ClientDiagnostics = clientDiagnostics; } @@ -198,21 +194,8 @@ private async Task NotifyOfPendingChunkDownloadEvents() long currentRangeOffset = _ranges[_currentRangeIndex].Offset; if (currentRangeOffset < args.Offset) { - // One of the chunks finished downloading before the chunk(s) - // before it (early bird, or the last chunk) - // Save the chunk to a temporary file to append later - string chunkFilePath = Path.GetTempFileName(); - using (Stream chunkContent = args.Result) - { - await _copyToChunkFile(chunkFilePath, chunkContent).ConfigureAwait(false); - } - if (!_rangesCompleted.TryAdd(args.Offset, chunkFilePath)) - { - // Throw an error here that we were unable to idenity the - // the range that has come back to us. We should never see this error - // since we were the ones who calculated the range. - throw Errors.InvalidDownloadOffset(args.Offset, args.BytesTransferred); - } + // Early chunk, add to pending + _pendingChunks.Add(args.Offset, args.Result); } else if (currentRangeOffset == args.Offset) { @@ -223,13 +206,13 @@ private async Task NotifyOfPendingChunkDownloadEvents() { await _copyToDestinationFile( args.Offset, - args.BytesTransferred, + args.Length, content, _expectedLength).ConfigureAwait(false); } - UpdateBytesAndRange(args.BytesTransferred); + UpdateBytesAndRange(args.Length); - await AppendEarlyChunksToFile().ConfigureAwait(false); + await AppendPendingChunks().ConfigureAwait(false); // Check if we finished downloading the blob if (_bytesTransferred == _expectedLength) @@ -242,7 +225,7 @@ await _copyToDestinationFile( // We should never reach this point because that means // the range that came back was less than the next range that is supposed // to be copied to the file - throw Errors.InvalidDownloadOffset(args.Offset, args.BytesTransferred); + throw Errors.InvalidDownloadOffset(args.Offset, args.Length); } } } @@ -268,80 +251,43 @@ await _downloadChunkEventHandler.RaiseAsync( } } - private async Task AppendEarlyChunksToFile() + private async Task AppendPendingChunks() { - // If there are any other chunks that have already been downloaded that - // can be appended to the file, let's do it now. - while ((_bytesTransferred < _expectedLength) && - (_currentRangeIndex < _rangesCount) && - _rangesCompleted.ContainsKey(_ranges[_currentRangeIndex].Offset)) + while (_currentRangeIndex < _ranges.Count && + _pendingChunks.ContainsKey(_ranges[_currentRangeIndex].Offset)) { HttpRange currentRange = _ranges[_currentRangeIndex]; - if (_rangesCompleted.TryRemove(currentRange.Offset, out string chunkFilePath)) + using (Stream nextChunk = _pendingChunks[currentRange.Offset]) { - if (File.Exists(chunkFilePath)) - { - using (Stream content = File.OpenRead(chunkFilePath)) - { - await _copyToDestinationFile( - currentRange.Offset, - currentRange.Length.Value, - content, - _expectedLength).ConfigureAwait(false); - } - // Delete the temporary chunk file that's no longer needed - File.Delete(chunkFilePath); - } - else - { - throw Errors.TempChunkFileNotFound( - offset: currentRange.Offset, - length: currentRange.Length.Value, - filePath: chunkFilePath); - } + await _copyToDestinationFile( + currentRange.Offset, + currentRange.Length.Value, + nextChunk, + _expectedLength).ConfigureAwait(false); } - else - { - throw Errors.InvalidDownloadOffset(currentRange.Offset, currentRange.Length.Value); - } - - // Increment the current range we are expect, if it's null then - // that's the next one we have to wait on. - UpdateBytesAndRange((long)_ranges[_currentRangeIndex].Length); + _pendingChunks.Remove(currentRange.Offset); + UpdateBytesAndRange((long)currentRange.Length); } } private async Task InvokeFailedEvent(Exception ex) { - foreach (HttpRange range in _ranges) + foreach (Stream chunkStream in _pendingChunks.Values) { - if (_rangesCompleted.TryRemove(range.Offset, out string tempChunkFile)) - { - if (File.Exists(tempChunkFile)) - { - try - { - File.Delete(tempChunkFile); - } - catch (Exception deleteException) - { - await _invokeFailedEventHandler(deleteException).ConfigureAwait(false); - } - } - } + chunkStream.Dispose(); } + _pendingChunks.Clear(); await _invokeFailedEventHandler(ex).ConfigureAwait(false); } /// - /// Update the progress handler and the current range we are waiting on. + /// Moves the downloader to the next range and updates/reports bytes transferred. /// /// private void UpdateBytesAndRange(long bytesDownloaded) { - // don't need to use Interlocked since this is the only thread reading and updating these values - _bytesTransferred += bytesDownloaded; _currentRangeIndex++; + _bytesTransferred += bytesDownloaded; _reportProgressInBytes(bytesDownloaded); } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DownloadRangeEventArgs.cs b/sdk/storage/Azure.Storage.DataMovement/src/DownloadRangeEventArgs.cs index 2a1ecddce9ddf..5e3d64a0e1fdb 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DownloadRangeEventArgs.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DownloadRangeEventArgs.cs @@ -4,7 +4,6 @@ using System; using System.IO; using System.Threading; -using Azure.Core; using Azure.Storage.Common; namespace Azure.Storage.DataMovement @@ -18,7 +17,7 @@ internal class DownloadRangeEventArgs : DataTransferEventArgs /// /// Will be 0 if Success is false /// - public long BytesTransferred { get; } + public long Length { get; } /// /// Stream results of the range downloaded if Sucess is true @@ -67,7 +66,7 @@ public DownloadRangeEventArgs( } Success = success; Offset = offset; - BytesTransferred = bytesTransferred; + Length = bytesTransferred; Result = result; Exception = exception; } diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs index 4675e14cb2cf2..7b276e82880d9 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs @@ -99,14 +99,6 @@ private void VerifyDelegateInvocations( return mock; } - private Mock GetExceptionCopyToChunkFileTask() - { - var mock = new Mock(MockBehavior.Strict); - mock.Setup(del => del(It.IsNotNull(), It.IsNotNull())) - .Throws(new UnauthorizedAccessException()); - return mock; - } - private Mock GetExceptionCopyToDestinationFileTask() { var mock = new Mock(MockBehavior.Strict); @@ -399,7 +391,7 @@ public async Task MultipleChunkTransfer_EarlyChunks(long blockSize) PredictableStream content = new PredictableStream(blockSize); - // Act - Make initial range event + // Act - The second chunk returns first await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( transferId: "fake-id", success: true, @@ -415,11 +407,11 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: 0, - expectedCopyChunkCount: 1, + expectedCopyChunkCount: 0, expectedReportProgressCount: 0, expectedCompleteFileCount: 0); - // Act - Make the repeat at the same offset to cause an error. + // Act - The first chunk is then returned await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( transferId: "fake-id", success: true, @@ -435,7 +427,7 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: 2, - expectedCopyChunkCount: 1, + expectedCopyChunkCount: 0, expectedReportProgressCount: 2, expectedCompleteFileCount: 1); } @@ -499,54 +491,6 @@ public async Task MultipleChunkTransfer_MultipleProcesses(long blockSize, int ta expectedCompleteFileCount: 1); } - [Test] - public async Task GetCopyToChunkFileTask_ExpectedFailure() - { - // Arrange - MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); - mockBehaviors.CopyToChunkFileTask = GetExceptionCopyToChunkFileTask(); - int blockSize = 512; - long expectedLength = blockSize * 2; - List ranges = GetRanges(blockSize, expectedLength); - - var downloadChunkHandler = new DownloadChunkHandler( - currentTransferred: 0, - expectedLength: expectedLength, - ranges: ranges, - behaviors: new DownloadChunkHandler.Behaviors - { - CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, - QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, - ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, - InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, - }, - ClientDiagnostics, - cancellationToken: CancellationToken.None); - - PredictableStream content = new PredictableStream(blockSize); - - // Act - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, - offset: blockSize, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); - - // Assert - VerifyDelegateInvocations( - behaviors: mockBehaviors, - expectedFailureCount: 1, - expectedCopyDestinationCount: 0, - expectedCopyChunkCount: 1, - expectedReportProgressCount: 0, - expectedCompleteFileCount: 0); - } - [Test] public async Task GetCopyToDestinationFileTask_ExpectedFailure() { From 85655640f81216de061afb1f42452882a838984c Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Wed, 13 Nov 2024 14:50:22 -0800 Subject: [PATCH 02/12] Remove now unused behavior, clean up --- .../src/DownloadChunkHandler.cs | 32 +++++------------ .../src/UriToStreamJobPart.cs | 15 -------- .../tests/DownloadChunkHandlerTests.cs | 35 ------------------- 3 files changed, 9 insertions(+), 73 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs index 72f18c1874745..2e48ba02997f0 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs @@ -2,7 +2,6 @@ // Licensed under the MIT License. using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Threading; @@ -16,19 +15,14 @@ namespace Azure.Storage.DataMovement { internal class DownloadChunkHandler : IDisposable { - // Indicates whether the current thread is processing stage chunks. - private static Task _processDownloadRangeEvents; - #region Delegate Definitions public delegate Task CopyToDestinationFileInternal(long offset, long length, Stream stream, long expectedLength); - public delegate Task CopyToChunkFileInternal(string chunkFilePath, Stream stream); public delegate void ReportProgressInBytes(long bytesWritten); public delegate Task QueueCompleteFileDownloadInternal(); public delegate Task InvokeFailedEventHandlerInternal(Exception ex); #endregion Delegate Definitions private readonly CopyToDestinationFileInternal _copyToDestinationFile; - private readonly CopyToChunkFileInternal _copyToChunkFile; private readonly ReportProgressInBytes _reportProgressInBytes; private readonly InvokeFailedEventHandlerInternal _invokeFailedEventHandler; private readonly QueueCompleteFileDownloadInternal _queueCompleteFileDownload; @@ -36,12 +30,8 @@ internal class DownloadChunkHandler : IDisposable public struct Behaviors { public CopyToDestinationFileInternal CopyToDestinationFile { get; set; } - - public CopyToChunkFileInternal CopyToChunkFile { get; set; } public ReportProgressInBytes ReportProgressInBytes { get; set; } - public InvokeFailedEventHandlerInternal InvokeFailedHandler { get; set; } - public QueueCompleteFileDownloadInternal QueueCompleteFileDownload { get; set; } } @@ -49,11 +39,12 @@ public struct Behaviors internal SyncAsyncEventHandler GetDownloadChunkHandler() => _downloadChunkEventHandler; /// - /// Create channel of to keep track of that are - /// waiting to update the bytesTransferred and other required operations. + /// Create channel of to keep track to handle + /// writing downloaded chunks to the destination as well as tracking overall progress. /// private readonly Channel _downloadRangeChannel; - private CancellationToken _cancellationToken; + private readonly Task _processDownloadRangeEvents; + private readonly CancellationToken _cancellationToken; private long _bytesTransferred; private readonly long _expectedLength; @@ -71,7 +62,7 @@ public struct Behaviors /// If any download chunks come in early before the chunk before it /// to copy to the file, let's hold it in order here before we copy it over. /// - private Dictionary _pendingChunks; + private readonly Dictionary _pendingChunks; internal ClientDiagnostics ClientDiagnostics { get; } @@ -111,17 +102,15 @@ public DownloadChunkHandler( _bytesTransferred = currentTransferred; _currentRangeIndex = 0; - // Create channel of finished Stage Chunk Args to update the bytesTransferred - // and for ending tasks like commit block. // The size of the channel should never exceed 50k (limit on blocks in a block blob). // and that's in the worst case that we never read from the channel and had a maximum chunk blob. _downloadRangeChannel = Channel.CreateUnbounded( new UnboundedChannelOptions() { - // Single reader is required as we can only read and write to bytesTransferred value + // Single reader is required as we can only have one writer to the destination. SingleReader = true, }); - _processDownloadRangeEvents = Task.Run(() => NotifyOfPendingChunkDownloadEvents()); + _processDownloadRangeEvents = Task.Run(NotifyOfPendingChunkDownloadEvents); _cancellationToken = cancellationToken; _expectedLength = expectedLength; @@ -139,8 +128,6 @@ public DownloadChunkHandler( // Set values _copyToDestinationFile = behaviors.CopyToDestinationFile ?? throw Errors.ArgumentNull(nameof(behaviors.CopyToDestinationFile)); - _copyToChunkFile = behaviors.CopyToChunkFile - ?? throw Errors.ArgumentNull(nameof(behaviors.CopyToChunkFile)); _reportProgressInBytes = behaviors.ReportProgressInBytes ?? throw Errors.ArgumentNull(nameof(behaviors.ReportProgressInBytes)); _invokeFailedEventHandler = behaviors.InvokeFailedHandler @@ -199,9 +186,7 @@ private async Task NotifyOfPendingChunkDownloadEvents() } else if (currentRangeOffset == args.Offset) { - // Start Copying the response to the file stream and any other chunks after - // Most of the time we will always get the next chunk first so the loop - // on averages runs once. + // Copy the current chunk to the destination using (Stream content = args.Result) { await _copyToDestinationFile( @@ -212,6 +197,7 @@ await _copyToDestinationFile( } UpdateBytesAndRange(args.Length); + // Check if the next chunks are already downloaeded and copy those await AppendPendingChunks().ConfigureAwait(false); // Check if we finished downloading the blob diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index ecaed741f19a0..4bc1110f03bff 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -464,20 +464,6 @@ await _destinationResource.CopyFromStreamAsync( return false; } - public async Task WriteChunkToTempFile(string chunkFilePath, Stream source) - { - CancellationHelper.ThrowIfCancellationRequested(_cancellationToken); - - using (FileStream fileStream = File.OpenWrite(chunkFilePath)) - { - await source.CopyToAsync( - fileStream, - DataMovementConstants.DefaultStreamCopyBufferSize, - _cancellationToken) - .ConfigureAwait(false); - } - } - internal DownloadChunkHandler GetDownloadChunkHandler( long currentTransferred, long expectedLength, @@ -496,7 +482,6 @@ internal static DownloadChunkHandler.Behaviors GetDownloadChunkHandlerBehaviors( return new DownloadChunkHandler.Behaviors() { CopyToDestinationFile = job.CopyToStreamInternal, - CopyToChunkFile = job.WriteChunkToTempFile, ReportProgressInBytes = job.ReportBytesWritten, InvokeFailedHandler = job.InvokeFailedArgAsync, QueueCompleteFileDownload = job.QueueCompleteFileDownload diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs index 7b276e82880d9..8b58aabaef9cf 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs @@ -23,7 +23,6 @@ public DownloadChunkHandlerTests() { } private readonly int _maxDelayInSec = 1; private readonly string _failedEventMsg = "Amount of Failed Event Handler calls was incorrect."; private readonly string _copyToDestinationMsg = "Amount of Copy To Destination Task calls were incorrect."; - private readonly string _copyToChunkFileMsg = "Amount of Copy To Chunk File Task calls were incorrect."; private readonly string _reportProgressInBytesMsg = "Amount of Progress amount calls were incorrect."; private readonly string _completeFileDownloadMsg = "Complete File Download call amount calls were incorrect."; @@ -33,7 +32,6 @@ private void VerifyDelegateInvocations( MockDownloadChunkBehaviors behaviors, int expectedFailureCount, int expectedCopyDestinationCount, - int expectedCopyChunkCount, int expectedReportProgressCount, int expectedCompleteFileCount, int maxWaitTimeInSec = 6) @@ -42,14 +40,12 @@ private void VerifyDelegateInvocations( CancellationToken cancellationToken = cancellationSource.Token; int currentFailedEventCount = behaviors.InvokeFailedEventHandlerTask.Invocations.Count; int currentCopyDestinationCount = behaviors.CopyToDestinationFileTask.Invocations.Count; - int currentCopyChunkCount = behaviors.CopyToChunkFileTask.Invocations.Count; int currentProgressReportedCount = behaviors.ReportProgressInBytesTask.Invocations.Count; int currentCompleteDownloadCount = behaviors.QueueCompleteFileDownloadTask.Invocations.Count; try { while (currentFailedEventCount != expectedFailureCount || currentCopyDestinationCount != expectedCopyDestinationCount - || currentCopyChunkCount != expectedCopyChunkCount || currentProgressReportedCount != expectedReportProgressCount || currentCompleteDownloadCount != expectedCompleteFileCount) { @@ -63,8 +59,6 @@ private void VerifyDelegateInvocations( Assert.LessOrEqual(currentFailedEventCount, expectedFailureCount, _failedEventMsg); currentCopyDestinationCount = behaviors.CopyToDestinationFileTask.Invocations.Count; Assert.LessOrEqual(currentCopyDestinationCount, expectedCopyDestinationCount, _copyToDestinationMsg); - currentCopyChunkCount = behaviors.CopyToChunkFileTask.Invocations.Count; - Assert.LessOrEqual(currentCopyChunkCount, expectedCopyChunkCount, _copyToChunkFileMsg); currentProgressReportedCount = behaviors.ReportProgressInBytesTask.Invocations.Count; Assert.LessOrEqual(currentProgressReportedCount, expectedReportProgressCount, _reportProgressInBytesMsg); currentCompleteDownloadCount = behaviors.QueueCompleteFileDownloadTask.Invocations.Count; @@ -76,7 +70,6 @@ private void VerifyDelegateInvocations( string message = "Timed out waiting for the correct amount of invocations for each task\n" + $"Current Failed Event Invocations: {currentFailedEventCount} | Expected: {expectedFailureCount}\n" + $"Current Copy Destination Invocations: {currentCopyDestinationCount} | Expected: {expectedCopyDestinationCount}\n" + - $"Current Copy Chunk Invocations: {currentCopyChunkCount} | Expected: {expectedCopyChunkCount}\n" + $"Current Progress Reported Invocations: {currentProgressReportedCount} | Expected: {expectedReportProgressCount}\n" + $"Current Complete Download Invocations: {currentCompleteDownloadCount} | Expected: {expectedCompleteFileCount}"; Assert.Fail(message); @@ -91,14 +84,6 @@ private void VerifyDelegateInvocations( return mock; } - private Mock GetCopyToChunkFileTask() - { - var mock = new Mock(MockBehavior.Strict); - mock.Setup(del => del(It.IsNotNull(),It.IsNotNull())) - .Returns(Task.CompletedTask); - return mock; - } - private Mock GetExceptionCopyToDestinationFileTask() { var mock = new Mock(MockBehavior.Strict); @@ -141,7 +126,6 @@ private void VerifyDelegateInvocations( internal struct MockDownloadChunkBehaviors { public Mock CopyToDestinationFileTask; - public Mock CopyToChunkFileTask; public Mock ReportProgressInBytesTask; public Mock QueueCompleteFileDownloadTask; public Mock InvokeFailedEventHandlerTask; @@ -151,7 +135,6 @@ private MockDownloadChunkBehaviors GetMockDownloadChunkBehaviors() => new MockDownloadChunkBehaviors() { CopyToDestinationFileTask = GetCopyToDestinationFileTask(), - CopyToChunkFileTask = GetCopyToChunkFileTask(), ReportProgressInBytesTask = GetReportProgressInBytesTask(), QueueCompleteFileDownloadTask = GetQueueCompleteFileDownloadTask(), InvokeFailedEventHandlerTask = GetInvokeFailedEventHandlerTask() @@ -207,7 +190,6 @@ public async Task OneChunkTransfer(long blockSize) new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, @@ -233,7 +215,6 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: 1, - expectedCopyChunkCount: 0, expectedReportProgressCount: 1, expectedCompleteFileCount: 1); } @@ -254,7 +235,6 @@ public async Task MultipleChunkTransfer(long blockSize) behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, @@ -280,7 +260,6 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: 1, - expectedCopyChunkCount: 0, expectedReportProgressCount: 1, expectedCompleteFileCount: 0); @@ -302,7 +281,6 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: 2, - expectedCopyChunkCount: 0, expectedReportProgressCount: 2, expectedCompleteFileCount: 1); } @@ -323,7 +301,6 @@ public async Task MultipleChunkTransfer_UnexpectedOffsetError(long blockSize) ranges: ranges, behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object }, @@ -359,7 +336,6 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( behaviors: mockBehaviors, expectedFailureCount: 1, expectedCopyDestinationCount: 1, - expectedCopyChunkCount: 0, expectedReportProgressCount: 1, expectedCompleteFileCount: 0); } @@ -381,7 +357,6 @@ public async Task MultipleChunkTransfer_EarlyChunks(long blockSize) behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, @@ -407,7 +382,6 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: 0, - expectedCopyChunkCount: 0, expectedReportProgressCount: 0, expectedCompleteFileCount: 0); @@ -427,7 +401,6 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: 2, - expectedCopyChunkCount: 0, expectedReportProgressCount: 2, expectedCompleteFileCount: 1); } @@ -450,7 +423,6 @@ public async Task MultipleChunkTransfer_MultipleProcesses(long blockSize, int ta behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, @@ -486,7 +458,6 @@ public async Task MultipleChunkTransfer_MultipleProcesses(long blockSize, int ta behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: taskSize, - expectedCopyChunkCount: 0, expectedReportProgressCount: taskSize, expectedCompleteFileCount: 1); } @@ -508,7 +479,6 @@ public async Task GetCopyToDestinationFileTask_ExpectedFailure() behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, @@ -534,7 +504,6 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( behaviors: mockBehaviors, expectedFailureCount: 1, expectedCopyDestinationCount: 1, - expectedCopyChunkCount: 0, expectedReportProgressCount: 0, expectedCompleteFileCount: 0); } @@ -555,7 +524,6 @@ public async Task QueueCompleteFileDownloadTask_ExpectedFailure() behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, @@ -581,7 +549,6 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( behaviors: mockBehaviors, expectedFailureCount: 1, expectedCopyDestinationCount: 1, - expectedCopyChunkCount: 0, expectedReportProgressCount: 1, expectedCompleteFileCount: 1); } @@ -602,7 +569,6 @@ public async Task DisposedEventHandler() behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, @@ -620,7 +586,6 @@ public async Task DisposedEventHandler() behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: 0, - expectedCopyChunkCount: 0, expectedReportProgressCount: 0, expectedCompleteFileCount: 0); } From 4ea51ef7a035c0fb02059fadbaa9f8df5d7fa71d Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Wed, 13 Nov 2024 17:52:20 -0800 Subject: [PATCH 03/12] Remove eventing from DownloadChunkHandler --- .../src/DownloadChunkHandler.cs | 73 +++---------------- .../src/UriToStreamJobPart.cs | 51 ++++--------- .../tests/DownloadChunkHandlerTests.cs | 54 ++++++-------- 3 files changed, 45 insertions(+), 133 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs index 2e48ba02997f0..230d4dc1c8873 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs @@ -7,7 +7,6 @@ using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; -using Azure.Core; using Azure.Core.Pipeline; using Azure.Storage.Common; @@ -35,9 +34,6 @@ public struct Behaviors public QueueCompleteFileDownloadInternal QueueCompleteFileDownload { get; set; } } - private event SyncAsyncEventHandler _downloadChunkEventHandler; - internal SyncAsyncEventHandler GetDownloadChunkHandler() => _downloadChunkEventHandler; - /// /// Create channel of to keep track to handle /// writing downloaded chunks to the destination as well as tracking overall progress. @@ -64,8 +60,6 @@ public struct Behaviors /// private readonly Dictionary _pendingChunks; - internal ClientDiagnostics ClientDiagnostics { get; } - /// /// The controller for downloading the chunks to each file. /// @@ -81,9 +75,6 @@ public struct Behaviors /// /// Contains all the supported function calls. /// - /// - /// ClientDiagnostics for handler logging. - /// /// /// Cancellation token of the job part or job to cancel any ongoing waiting in the /// download chunk handler to prevent infinite waiting. @@ -94,7 +85,6 @@ public DownloadChunkHandler( long expectedLength, IList ranges, Behaviors behaviors, - ClientDiagnostics clientDiagnostics, CancellationToken cancellationToken) { // Set bytes transferred to the length of bytes we got back from the initial @@ -123,7 +113,6 @@ public DownloadChunkHandler( } Argument.AssertNotNullOrEmpty(ranges, nameof(ranges)); Argument.AssertNotNull(behaviors, nameof(behaviors)); - Argument.AssertNotNull(clientDiagnostics, nameof(clientDiagnostics)); // Set values _copyToDestinationFile = behaviors.CopyToDestinationFile @@ -134,40 +123,21 @@ public DownloadChunkHandler( ?? throw Errors.ArgumentNull(nameof(behaviors.InvokeFailedHandler)); _queueCompleteFileDownload = behaviors.QueueCompleteFileDownload ?? throw Errors.ArgumentNull(nameof(behaviors.QueueCompleteFileDownload)); - - _downloadChunkEventHandler += DownloadChunkEvent; - ClientDiagnostics = clientDiagnostics; } public void Dispose() { + foreach (Stream chunkStream in _pendingChunks.Values) + { + chunkStream.Dispose(); + } + _pendingChunks.Clear(); _downloadRangeChannel.Writer.TryComplete(); - DisposeHandlers(); } - private void DisposeHandlers() + public void QueueChunk(DownloadRangeEventArgs args) { - _downloadChunkEventHandler -= DownloadChunkEvent; - } - - private async Task DownloadChunkEvent(DownloadRangeEventArgs args) - { - try - { - if (args.Success) - { - _downloadRangeChannel.Writer.TryWrite(args); - } - else - { - // Report back failed event. - throw args.Exception; - } - } - catch (Exception ex) - { - await InvokeFailedEvent(ex).ConfigureAwait(false); - } + _downloadRangeChannel.Writer.TryWrite(args); } private async Task NotifyOfPendingChunkDownloadEvents() @@ -217,23 +187,8 @@ await _copyToDestinationFile( } catch (Exception ex) { - await InvokeFailedEvent(ex).ConfigureAwait(false); - } - } - - public async Task InvokeEvent(DownloadRangeEventArgs args) - { - // There's a race condition where the event handler was disposed and an event - // was already invoked, we should skip over this as the download chunk handler - // was already disposed, and we should just ignore any more incoming events. - if (_downloadChunkEventHandler != null) - { - await _downloadChunkEventHandler.RaiseAsync( - args, - nameof(DownloadChunkHandler), - nameof(_downloadChunkEventHandler), - ClientDiagnostics) - .ConfigureAwait(false); + // This will trigger the job part to call Dispose on this object + await _invokeFailedEventHandler(ex).ConfigureAwait(false); } } @@ -256,16 +211,6 @@ await _copyToDestinationFile( } } - private async Task InvokeFailedEvent(Exception ex) - { - foreach (Stream chunkStream in _pendingChunks.Values) - { - chunkStream.Dispose(); - } - _pendingChunks.Clear(); - await _invokeFailedEventHandler(ex).ConfigureAwait(false); - } - /// /// Moves the downloader to the next range and updates/reports bytes transferred. /// diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index 4bc1110f03bff..3b70d4bc087cd 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -395,39 +395,19 @@ internal async Task DownloadStreamingInternal(HttpRange range) _cancellationToken).ConfigureAwait(false); // The chunk handler may have been disposed in failure case - if (_downloadChunkHandler != null) - { - await _downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: _dataTransfer.Id, - success: true, - offset: range.Offset, - bytesTransferred: (long)range.Length, - result: result.Content, - exception: default, - false, - _cancellationToken)).ConfigureAwait(false); - } + _downloadChunkHandler?.QueueChunk(new DownloadRangeEventArgs( + transferId: _dataTransfer.Id, + success: true, + offset: range.Offset, + bytesTransferred: (long)range.Length, + result: result.Content, + exception: default, + false, + _cancellationToken)); } catch (Exception ex) { - if (_downloadChunkHandler != null) - { - await _downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: _dataTransfer.Id, - success: false, - offset: range.Offset, - bytesTransferred: (long)range.Length, - result: default, - exception: ex, - false, - _cancellationToken)).ConfigureAwait(false); - } - else - { - // If the _downloadChunkHandler has been disposed before we call to it - // we should at least filter the exception to error handling just in case. - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - } + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } } @@ -474,17 +454,16 @@ internal DownloadChunkHandler GetDownloadChunkHandler( expectedLength, ranges, GetDownloadChunkHandlerBehaviors(jobPart), - ClientDiagnostics, _cancellationToken); - internal static DownloadChunkHandler.Behaviors GetDownloadChunkHandlerBehaviors(UriToStreamJobPart job) + private static DownloadChunkHandler.Behaviors GetDownloadChunkHandlerBehaviors(UriToStreamJobPart jobPart) { return new DownloadChunkHandler.Behaviors() { - CopyToDestinationFile = job.CopyToStreamInternal, - ReportProgressInBytes = job.ReportBytesWritten, - InvokeFailedHandler = job.InvokeFailedArgAsync, - QueueCompleteFileDownload = job.QueueCompleteFileDownload + CopyToDestinationFile = jobPart.CopyToStreamInternal, + ReportProgressInBytes = jobPart.ReportBytesWritten, + InvokeFailedHandler = jobPart.InvokeFailedArgAsync, + QueueCompleteFileDownload = jobPart.QueueCompleteFileDownload }; } diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs index 8b58aabaef9cf..2fe7514b3c31e 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs @@ -10,7 +10,6 @@ using System.Threading; using Azure.Core; using Azure.Storage.Tests.Shared; -using Azure.Core.Pipeline; using Azure.Storage.Common; namespace Azure.Storage.DataMovement.Tests @@ -26,8 +25,6 @@ public DownloadChunkHandlerTests() { } private readonly string _reportProgressInBytesMsg = "Amount of Progress amount calls were incorrect."; private readonly string _completeFileDownloadMsg = "Complete File Download call amount calls were incorrect."; - private ClientDiagnostics ClientDiagnostics => new(ClientOptions.Default); - private void VerifyDelegateInvocations( MockDownloadChunkBehaviors behaviors, int expectedFailureCount, @@ -174,7 +171,7 @@ private List GetRanges(long blockSize, long expectedLength) [TestCase(Constants.KB)] [TestCase(Constants.MB)] [TestCase(4 * Constants.MB)] - public async Task OneChunkTransfer(long blockSize) + public void OneChunkTransfer(long blockSize) { // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); @@ -194,13 +191,12 @@ public async Task OneChunkTransfer(long blockSize) ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, }, - ClientDiagnostics, CancellationToken.None); PredictableStream content = new PredictableStream(blockSize); // Act - Make one chunk that would meet the expected length - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( + downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( transferId: "fake-id", success: true, offset: 0, @@ -222,7 +218,7 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( [Test] [TestCase(512)] [TestCase(Constants.KB)] - public async Task MultipleChunkTransfer(long blockSize) + public void MultipleChunkTransfer(long blockSize) { // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); @@ -239,13 +235,12 @@ public async Task MultipleChunkTransfer(long blockSize) ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, }, - ClientDiagnostics, cancellationToken: CancellationToken.None); PredictableStream content = new PredictableStream(blockSize); // Act - Make one chunk that would update the bytes but not cause a commit block list to occur - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( + downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( transferId: "fake-id", success: true, offset: 0, @@ -266,7 +261,7 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( PredictableStream content2 = new PredictableStream(blockSize); // Act - Now add the last block to meet the required commited block amount. - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( + downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( transferId: "fake-id", success: true, offset: blockSize, @@ -288,7 +283,7 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( [Test] [TestCase(512)] [TestCase(Constants.KB)] - public async Task MultipleChunkTransfer_UnexpectedOffsetError(long blockSize) + public void MultipleChunkTransfer_UnexpectedOffsetError(long blockSize) { // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); @@ -304,13 +299,12 @@ public async Task MultipleChunkTransfer_UnexpectedOffsetError(long blockSize) QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object }, - ClientDiagnostics, cancellationToken: CancellationToken.None); PredictableStream content = new PredictableStream(blockSize); // Make initial range event - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( + downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( transferId: "fake-id", success: true, offset: 0, @@ -321,7 +315,7 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( cancellationToken: CancellationToken.None)); // Act - Make the repeat at the same offset to cause an error. - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( + downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( transferId: "fake-id", success: true, offset: 0, @@ -343,7 +337,7 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( [Test] [TestCase(512)] [TestCase(Constants.KB)] - public async Task MultipleChunkTransfer_EarlyChunks(long blockSize) + public void MultipleChunkTransfer_EarlyChunks(long blockSize) { // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); @@ -361,13 +355,12 @@ public async Task MultipleChunkTransfer_EarlyChunks(long blockSize) ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, }, - ClientDiagnostics, cancellationToken: CancellationToken.None); PredictableStream content = new PredictableStream(blockSize); // Act - The second chunk returns first - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( + downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( transferId: "fake-id", success: true, offset: blockSize, @@ -386,7 +379,7 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( expectedCompleteFileCount: 0); // Act - The first chunk is then returned - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( + downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( transferId: "fake-id", success: true, offset: 0, @@ -427,7 +420,6 @@ public async Task MultipleChunkTransfer_MultipleProcesses(long blockSize, int ta ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, }, - ClientDiagnostics, cancellationToken: CancellationToken.None); List runningTasks = new List(); @@ -437,15 +429,16 @@ public async Task MultipleChunkTransfer_MultipleProcesses(long blockSize, int ta { PredictableStream content = new PredictableStream(blockSize); - Task task = downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( + long offset = i * blockSize; + Task task = Task.Run(() => downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( transferId: "fake-id", success: true, - offset: i * blockSize, + offset: offset, bytesTransferred: blockSize, result: content, exception: default, isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + cancellationToken: CancellationToken.None))); runningTasks.Add(task); } @@ -463,7 +456,7 @@ public async Task MultipleChunkTransfer_MultipleProcesses(long blockSize, int ta } [Test] - public async Task GetCopyToDestinationFileTask_ExpectedFailure() + public void GetCopyToDestinationFileTask_ExpectedFailure() { // Arrange MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); @@ -483,13 +476,12 @@ public async Task GetCopyToDestinationFileTask_ExpectedFailure() ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, }, - ClientDiagnostics, cancellationToken: CancellationToken.None); PredictableStream content = new PredictableStream(blockSize); // Act - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( + downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( transferId: "fake-id", success: true, offset: 0, @@ -509,7 +501,7 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( } [Test] - public async Task QueueCompleteFileDownloadTask_ExpectedFailure() + public void QueueCompleteFileDownloadTask_ExpectedFailure() { // Arrange MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); @@ -528,13 +520,12 @@ public async Task QueueCompleteFileDownloadTask_ExpectedFailure() ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, }, - ClientDiagnostics, cancellationToken: CancellationToken.None); PredictableStream content = new PredictableStream(blockSize); // Act - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( + downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( transferId: "fake-id", success: true, offset: 0, @@ -554,7 +545,7 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( } [Test] - public async Task DisposedEventHandler() + public void DisposedEventHandler() { // Arrange - Create DownloadChunkHandler then Dispose it so the event handler is disposed MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); @@ -573,14 +564,11 @@ public async Task DisposedEventHandler() ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, }, - ClientDiagnostics, cancellationToken: CancellationToken.None); // Act downloadChunkHandler.Dispose(); - - // Assert - Do not throw when trying to invoke the event handler when disposed - await downloadChunkHandler.InvokeEvent(default); + downloadChunkHandler.QueueChunk(default); VerifyDelegateInvocations( behaviors: mockBehaviors, From 2bb95d4905b207f76cbf366b8fb6468a08331e11 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Thu, 14 Nov 2024 18:33:48 -0800 Subject: [PATCH 04/12] Refactor args --- .../src/DownloadChunkHandler.cs | 11 +- .../src/DownloadRangeEventArgs.cs | 74 ------------ .../src/QueueDownloadChunkArgs.cs | 24 ++++ .../src/UriToStreamJobPart.cs | 11 +- .../tests/DownloadChunkHandlerTests.cs | 110 +++++------------- 5 files changed, 62 insertions(+), 168 deletions(-) delete mode 100644 sdk/storage/Azure.Storage.DataMovement/src/DownloadRangeEventArgs.cs create mode 100644 sdk/storage/Azure.Storage.DataMovement/src/QueueDownloadChunkArgs.cs diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs index 230d4dc1c8873..b87cc051dcdfd 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs @@ -7,7 +7,6 @@ using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; -using Azure.Core.Pipeline; using Azure.Storage.Common; namespace Azure.Storage.DataMovement @@ -35,10 +34,10 @@ public struct Behaviors } /// - /// Create channel of to keep track to handle + /// Create channel of to keep track to handle /// writing downloaded chunks to the destination as well as tracking overall progress. /// - private readonly Channel _downloadRangeChannel; + private readonly Channel _downloadRangeChannel; private readonly Task _processDownloadRangeEvents; private readonly CancellationToken _cancellationToken; @@ -94,7 +93,7 @@ public DownloadChunkHandler( // The size of the channel should never exceed 50k (limit on blocks in a block blob). // and that's in the worst case that we never read from the channel and had a maximum chunk blob. - _downloadRangeChannel = Channel.CreateUnbounded( + _downloadRangeChannel = Channel.CreateUnbounded( new UnboundedChannelOptions() { // Single reader is required as we can only have one writer to the destination. @@ -135,7 +134,7 @@ public void Dispose() _downloadRangeChannel.Writer.TryComplete(); } - public void QueueChunk(DownloadRangeEventArgs args) + public void QueueChunk(QueueDownloadChunkArgs args) { _downloadRangeChannel.Writer.TryWrite(args); } @@ -147,7 +146,7 @@ private async Task NotifyOfPendingChunkDownloadEvents() while (await _downloadRangeChannel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false)) { // Read one event argument at a time. - DownloadRangeEventArgs args = await _downloadRangeChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false); + QueueDownloadChunkArgs args = await _downloadRangeChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false); long currentRangeOffset = _ranges[_currentRangeIndex].Offset; if (currentRangeOffset < args.Offset) { diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DownloadRangeEventArgs.cs b/sdk/storage/Azure.Storage.DataMovement/src/DownloadRangeEventArgs.cs deleted file mode 100644 index 5e3d64a0e1fdb..0000000000000 --- a/sdk/storage/Azure.Storage.DataMovement/src/DownloadRangeEventArgs.cs +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -using System; -using System.IO; -using System.Threading; -using Azure.Storage.Common; - -namespace Azure.Storage.DataMovement -{ - internal class DownloadRangeEventArgs : DataTransferEventArgs - { - public bool Success { get; } - - public long Offset { get; } - - /// - /// Will be 0 if Success is false - /// - public long Length { get; } - - /// - /// Stream results of the range downloaded if Sucess is true - /// - public Stream Result { get; } - - /// - /// If is false, this value will be populated - /// with the exception that was thrown. - /// - public Exception Exception { get; } - - /// - /// Constructor - /// - /// - /// Id of the transfer - /// - /// - /// Whether or not the download range call was successful - /// - /// - /// - /// - /// - /// - /// - public DownloadRangeEventArgs( - string transferId, - bool success, - long offset, - long bytesTransferred, - Stream result, - Exception exception, - bool isRunningSynchronously, - CancellationToken cancellationToken) : - base(transferId, isRunningSynchronously, cancellationToken) - { - if (success && exception != null) - { - Argument.AssertNull(exception, nameof(exception)); - } - else if (!success && exception == null) - { - Argument.AssertNotNull(exception, nameof(exception)); - } - Success = success; - Offset = offset; - Length = bytesTransferred; - Result = result; - Exception = exception; - } - } -} diff --git a/sdk/storage/Azure.Storage.DataMovement/src/QueueDownloadChunkArgs.cs b/sdk/storage/Azure.Storage.DataMovement/src/QueueDownloadChunkArgs.cs new file mode 100644 index 0000000000000..c77b5006cbf00 --- /dev/null +++ b/sdk/storage/Azure.Storage.DataMovement/src/QueueDownloadChunkArgs.cs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.IO; + +namespace Azure.Storage.DataMovement +{ + internal class QueueDownloadChunkArgs + { + public long Offset { get; } + public long Length { get; } + public Stream Result { get; } + + public QueueDownloadChunkArgs( + long offset, + long length, + Stream result) + { + Offset = offset; + Length = length; + Result = result; + } + } +} diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index 3b70d4bc087cd..52a2a18dd3140 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -395,15 +395,10 @@ internal async Task DownloadStreamingInternal(HttpRange range) _cancellationToken).ConfigureAwait(false); // The chunk handler may have been disposed in failure case - _downloadChunkHandler?.QueueChunk(new DownloadRangeEventArgs( - transferId: _dataTransfer.Id, - success: true, + _downloadChunkHandler?.QueueChunk(new QueueDownloadChunkArgs( offset: range.Offset, - bytesTransferred: (long)range.Length, - result: result.Content, - exception: default, - false, - _cancellationToken)); + length: (long)range.Length, + result: result.Content)); } catch (Exception ex) { diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs index 2fe7514b3c31e..4349036da1d2f 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs @@ -196,15 +196,10 @@ public void OneChunkTransfer(long blockSize) PredictableStream content = new PredictableStream(blockSize); // Act - Make one chunk that would meet the expected length - downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + result: content)); // Assert VerifyDelegateInvocations( @@ -240,15 +235,10 @@ public void MultipleChunkTransfer(long blockSize) PredictableStream content = new PredictableStream(blockSize); // Act - Make one chunk that would update the bytes but not cause a commit block list to occur - downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + result: content)); // Assert VerifyDelegateInvocations( @@ -261,15 +251,10 @@ public void MultipleChunkTransfer(long blockSize) PredictableStream content2 = new PredictableStream(blockSize); // Act - Now add the last block to meet the required commited block amount. - downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: blockSize, - bytesTransferred: blockSize, - result: content2, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + result: content2)); // Assert VerifyDelegateInvocations( @@ -304,26 +289,16 @@ public void MultipleChunkTransfer_UnexpectedOffsetError(long blockSize) PredictableStream content = new PredictableStream(blockSize); // Make initial range event - downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + result: content)); // Act - Make the repeat at the same offset to cause an error. - downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + result: content)); // Assert VerifyDelegateInvocations( @@ -360,15 +335,10 @@ public void MultipleChunkTransfer_EarlyChunks(long blockSize) PredictableStream content = new PredictableStream(blockSize); // Act - The second chunk returns first - downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: blockSize, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + result: content)); // Assert VerifyDelegateInvocations( @@ -379,15 +349,10 @@ public void MultipleChunkTransfer_EarlyChunks(long blockSize) expectedCompleteFileCount: 0); // Act - The first chunk is then returned - downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + result: content)); // Assert VerifyDelegateInvocations( @@ -430,15 +395,10 @@ public async Task MultipleChunkTransfer_MultipleProcesses(long blockSize, int ta PredictableStream content = new PredictableStream(blockSize); long offset = i * blockSize; - Task task = Task.Run(() => downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + Task task = Task.Run(() => downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: offset, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None))); + length: blockSize, + result: content))); runningTasks.Add(task); } @@ -481,15 +441,10 @@ public void GetCopyToDestinationFileTask_ExpectedFailure() PredictableStream content = new PredictableStream(blockSize); // Act - downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + result: content)); // Assert VerifyDelegateInvocations( @@ -525,15 +480,10 @@ public void QueueCompleteFileDownloadTask_ExpectedFailure() PredictableStream content = new PredictableStream(blockSize); // Act - downloadChunkHandler.QueueChunk(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + result: content)); // Assert VerifyDelegateInvocations( From 423cb35c03c269c984529db573aca94ca06c18d6 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Thu, 14 Nov 2024 18:39:45 -0800 Subject: [PATCH 05/12] Enumerable --- .../src/UriToStreamJobPart.cs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index 52a2a18dd3140..9eb3c70b1d3ae 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -8,6 +8,7 @@ using System.Threading; using Azure.Core; using Azure.Storage.Common; +using System.Linq; namespace Azure.Storage.DataMovement { @@ -218,9 +219,9 @@ public override async Task ProcessPartToChunkAsync() internal async Task UnknownDownloadInternal() { Task initialTask = _sourceResource.ReadStreamAsync( - position: 0, - length: _initialTransferSize, - _cancellationToken); + position: 0, + length: _initialTransferSize, + _cancellationToken); try { @@ -325,7 +326,7 @@ internal async Task LengthKnownDownloadInternal() private async Task QueueChunksToChannel(long initialLength, long totalLength) { // Get list of ranges of the blob - IList ranges = GetRangesList(initialLength, totalLength, _transferChunkSize); + IList ranges = GetRanges(initialLength, totalLength, _transferChunkSize).ToList(); // Create Download Chunk event handler to manage when the ranges finish downloading _downloadChunkHandler = GetDownloadChunkHandler( @@ -467,14 +468,12 @@ private Task QueueCompleteFileDownload() return QueueChunkToChannelAsync(CompleteFileDownload); } - private static IList GetRangesList(long initialLength, long totalLength, long rangeSize) + private static IEnumerable GetRanges(long initialLength, long totalLength, long rangeSize) { - IList list = new List(); for (long offset = initialLength; offset < totalLength; offset += rangeSize) { - list.Add(new HttpRange(offset, Math.Min(totalLength - offset, rangeSize))); + yield return new HttpRange(offset, Math.Min(totalLength - offset, rangeSize)); } - return list; } #endregion PartitionedDownloader From 9e863e6fd02b1eb66118488e0f9925e491ecd9b1 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Sun, 17 Nov 2024 17:12:25 -0800 Subject: [PATCH 06/12] No more pending chunks! --- .../src/DownloadChunkHandler.cs | 99 +++------------ .../src/LocalFileStorageResource.cs | 8 +- .../StorageResourceWriteToOffsetOptions.cs | 5 + .../src/UriToStreamJobPart.cs | 36 ++---- .../tests/DownloadChunkHandlerTests.cs | 120 ++++-------------- 5 files changed, 62 insertions(+), 206 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs index b87cc051dcdfd..17886dbff61ec 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs @@ -2,7 +2,6 @@ // Licensed under the MIT License. using System; -using System.Collections.Generic; using System.IO; using System.Threading; using System.Threading.Channels; @@ -14,7 +13,7 @@ namespace Azure.Storage.DataMovement internal class DownloadChunkHandler : IDisposable { #region Delegate Definitions - public delegate Task CopyToDestinationFileInternal(long offset, long length, Stream stream, long expectedLength); + public delegate Task CopyToDestinationFileInternal(long offset, long length, Stream stream, long expectedLength, bool initial); public delegate void ReportProgressInBytes(long bytesWritten); public delegate Task QueueCompleteFileDownloadInternal(); public delegate Task InvokeFailedEventHandlerInternal(Exception ex); @@ -43,21 +42,7 @@ public struct Behaviors private long _bytesTransferred; private readonly long _expectedLength; - - /// - /// List that holds all ranges of chunks to process. - /// - private readonly IList _ranges; - /// - /// Holds which range we are currently waiting on to download. - /// - private int _currentRangeIndex; - - /// - /// If any download chunks come in early before the chunk before it - /// to copy to the file, let's hold it in order here before we copy it over. - /// - private readonly Dictionary _pendingChunks; + private int _chunkTransferred; /// /// The controller for downloading the chunks to each file. @@ -68,9 +53,6 @@ public struct Behaviors /// /// The expected length of the content to be downloaded in bytes. /// - /// - /// List that holds the expected ranges the chunk ranges will come back as. - /// /// /// Contains all the supported function calls. /// @@ -82,14 +64,13 @@ public struct Behaviors public DownloadChunkHandler( long currentTransferred, long expectedLength, - IList ranges, Behaviors behaviors, CancellationToken cancellationToken) { // Set bytes transferred to the length of bytes we got back from the initial // download request _bytesTransferred = currentTransferred; - _currentRangeIndex = 0; + _chunkTransferred = 0; // The size of the channel should never exceed 50k (limit on blocks in a block blob). // and that's in the worst case that we never read from the channel and had a maximum chunk blob. @@ -103,14 +84,11 @@ public DownloadChunkHandler( _cancellationToken = cancellationToken; _expectedLength = expectedLength; - _ranges = ranges; - _pendingChunks = new(); if (expectedLength <= 0) { throw Errors.InvalidExpectedLength(expectedLength); } - Argument.AssertNotNullOrEmpty(ranges, nameof(ranges)); Argument.AssertNotNull(behaviors, nameof(behaviors)); // Set values @@ -126,11 +104,6 @@ public DownloadChunkHandler( public void Dispose() { - foreach (Stream chunkStream in _pendingChunks.Values) - { - chunkStream.Dispose(); - } - _pendingChunks.Clear(); _downloadRangeChannel.Writer.TryComplete(); } @@ -147,40 +120,23 @@ private async Task NotifyOfPendingChunkDownloadEvents() { // Read one event argument at a time. QueueDownloadChunkArgs args = await _downloadRangeChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false); - long currentRangeOffset = _ranges[_currentRangeIndex].Offset; - if (currentRangeOffset < args.Offset) - { - // Early chunk, add to pending - _pendingChunks.Add(args.Offset, args.Result); - } - else if (currentRangeOffset == args.Offset) + + // Copy the current chunk to the destination + using (Stream content = args.Result) { - // Copy the current chunk to the destination - using (Stream content = args.Result) - { - await _copyToDestinationFile( - args.Offset, - args.Length, - content, - _expectedLength).ConfigureAwait(false); - } - UpdateBytesAndRange(args.Length); - - // Check if the next chunks are already downloaeded and copy those - await AppendPendingChunks().ConfigureAwait(false); - - // Check if we finished downloading the blob - if (_bytesTransferred == _expectedLength) - { - await _queueCompleteFileDownload().ConfigureAwait(false); - } + await _copyToDestinationFile( + args.Offset, + args.Length, + content, + _expectedLength, + initial: _chunkTransferred == 0).ConfigureAwait(false); } - else + UpdateBytesAndRange(args.Length); + + //Check if we finished downloading the blob + if (_bytesTransferred == _expectedLength) { - // We should never reach this point because that means - // the range that came back was less than the next range that is supposed - // to be copied to the file - throw Errors.InvalidDownloadOffset(args.Offset, args.Length); + await _queueCompleteFileDownload().ConfigureAwait(false); } } } @@ -191,32 +147,13 @@ await _copyToDestinationFile( } } - private async Task AppendPendingChunks() - { - while (_currentRangeIndex < _ranges.Count && - _pendingChunks.ContainsKey(_ranges[_currentRangeIndex].Offset)) - { - HttpRange currentRange = _ranges[_currentRangeIndex]; - using (Stream nextChunk = _pendingChunks[currentRange.Offset]) - { - await _copyToDestinationFile( - currentRange.Offset, - currentRange.Length.Value, - nextChunk, - _expectedLength).ConfigureAwait(false); - } - _pendingChunks.Remove(currentRange.Offset); - UpdateBytesAndRange((long)currentRange.Length); - } - } - /// /// Moves the downloader to the next range and updates/reports bytes transferred. /// /// private void UpdateBytesAndRange(long bytesDownloaded) { - _currentRangeIndex++; + _chunkTransferred++; _bytesTransferred += bytesDownloaded; _reportProgressInBytes(bytesDownloaded); } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs index 1d13dfeed1238..68e77fb501b96 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs @@ -126,7 +126,7 @@ protected internal override async Task CopyFromStreamAsync( CancellationHelper.ThrowIfCancellationRequested(cancellationToken); long position = options?.Position != default ? options.Position.Value : 0; - if (position == 0) + if (options?.Initial == true) { Create(overwrite); } @@ -134,9 +134,9 @@ protected internal override async Task CopyFromStreamAsync( { // Appends incoming stream to the local file resource using (FileStream fileStream = new FileStream( - _uri.LocalPath, - FileMode.OpenOrCreate, - FileAccess.Write)) + _uri.LocalPath, + FileMode.Open, + FileAccess.Write)) { if (position > 0) { diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceWriteToOffsetOptions.cs b/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceWriteToOffsetOptions.cs index a041f20a07f6f..9d5bd235726df 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceWriteToOffsetOptions.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceWriteToOffsetOptions.cs @@ -24,6 +24,11 @@ public class StorageResourceWriteToOffsetOptions /// public long? Position { get; set; } + /// + /// Optional. Specifies whether this write is for the initial chunk. + /// + public bool Initial { get; set; } + /// /// Optional. Specifies the source properties to set in the destination. /// diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index 9eb3c70b1d3ae..d85630d43f7a2 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -255,7 +255,8 @@ internal async Task UnknownDownloadInternal() offset: 0, sourceLength: initialLength.Value, source: initialResult.Content, - expectedLength: totalLength).ConfigureAwait(false); + expectedLength: totalLength, + initial: true).ConfigureAwait(false); if (successfulInitialCopy) { ReportBytesWritten(initialLength.Value); @@ -307,7 +308,8 @@ internal async Task LengthKnownDownloadInternal() offset: 0, sourceLength: downloadLength, source: result.Content, - expectedLength: totalLength).ConfigureAwait(false); + expectedLength: totalLength, + initial: true).ConfigureAwait(false); if (successfulCopy) { ReportBytesWritten(downloadLength); @@ -325,20 +327,17 @@ internal async Task LengthKnownDownloadInternal() #region PartitionedDownloader private async Task QueueChunksToChannel(long initialLength, long totalLength) { - // Get list of ranges of the blob - IList ranges = GetRanges(initialLength, totalLength, _transferChunkSize).ToList(); - // Create Download Chunk event handler to manage when the ranges finish downloading - _downloadChunkHandler = GetDownloadChunkHandler( + _downloadChunkHandler = new DownloadChunkHandler( currentTransferred: initialLength, expectedLength: totalLength, - ranges: ranges, - jobPart: this); + GetDownloadChunkHandlerBehaviors(this), + _cancellationToken); // Fill the queue with tasks to download each of the remaining // ranges in the file _queueingTasks = true; - foreach (HttpRange httpRange in ranges) + foreach (HttpRange httpRange in GetRanges(initialLength, totalLength, _transferChunkSize)) { if (_cancellationToken.IsCancellationRequested) { @@ -411,7 +410,8 @@ public async Task CopyToStreamInternal( long offset, long sourceLength, Stream source, - long expectedLength) + long expectedLength, + bool initial) { CancellationHelper.ThrowIfCancellationRequested(_cancellationToken); @@ -426,6 +426,7 @@ await _destinationResource.CopyFromStreamAsync( options: new StorageResourceWriteToOffsetOptions() { Position = offset, + Initial = initial, }, cancellationToken: _cancellationToken).ConfigureAwait(false); return true; @@ -440,18 +441,6 @@ await _destinationResource.CopyFromStreamAsync( return false; } - internal DownloadChunkHandler GetDownloadChunkHandler( - long currentTransferred, - long expectedLength, - IList ranges, - UriToStreamJobPart jobPart) - => new DownloadChunkHandler( - currentTransferred, - expectedLength, - ranges, - GetDownloadChunkHandlerBehaviors(jobPart), - _cancellationToken); - private static DownloadChunkHandler.Behaviors GetDownloadChunkHandlerBehaviors(UriToStreamJobPart jobPart) { return new DownloadChunkHandler.Behaviors() @@ -505,7 +494,8 @@ private async Task CreateZeroLengthDownload() offset: 0, sourceLength: 0, source: default, - expectedLength: 0).ConfigureAwait(false); + expectedLength: 0, + initial: true).ConfigureAwait(false); if (successfulCreation) { // Queue the work to end the download diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs index 4349036da1d2f..4710762d60680 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs @@ -10,7 +10,6 @@ using System.Threading; using Azure.Core; using Azure.Storage.Tests.Shared; -using Azure.Storage.Common; namespace Azure.Storage.DataMovement.Tests { @@ -71,12 +70,27 @@ private void VerifyDelegateInvocations( $"Current Complete Download Invocations: {currentCompleteDownloadCount} | Expected: {expectedCompleteFileCount}"; Assert.Fail(message); } + + // Assert the first call to copy to the destination always specifies initial and the rest don't + int count = 0; + foreach (IInvocation invocation in behaviors.CopyToDestinationFileTask.Invocations) + { + if (count == 0) + { + Assert.That((bool)invocation.Arguments[4], Is.True); + } + else + { + Assert.That((bool)invocation.Arguments[4], Is.False); + } + count++; + } } private Mock GetCopyToDestinationFileTask() { var mock = new Mock(MockBehavior.Strict); - mock.Setup(del => del(It.IsNotNull(), It.IsNotNull(), It.IsNotNull(), It.IsNotNull())) + mock.Setup(del => del(It.IsNotNull(), It.IsNotNull(), It.IsNotNull(), It.IsNotNull(), It.IsAny())) .Returns(Task.CompletedTask); return mock; } @@ -84,7 +98,7 @@ private void VerifyDelegateInvocations( private Mock GetExceptionCopyToDestinationFileTask() { var mock = new Mock(MockBehavior.Strict); - mock.Setup(del => del(It.IsNotNull(), It.IsNotNull(), It.IsNotNull(), It.IsNotNull())) + mock.Setup(del => del(It.IsNotNull(), It.IsNotNull(), It.IsNotNull(), It.IsNotNull(), It.IsAny())) .Throws(new UnauthorizedAccessException()); return mock; } @@ -137,35 +151,6 @@ private MockDownloadChunkBehaviors GetMockDownloadChunkBehaviors() InvokeFailedEventHandlerTask = GetInvokeFailedEventHandlerTask() }; - /// - /// Creates ranges that the download chunk handler is expecting. - /// - /// - /// The block size which the size of the range will equal. - /// This value must be less or equal to the expected length. - /// - /// - /// Expected full length of the download to create ranges of. - /// - /// - private List GetRanges(long blockSize, long expectedLength) - { - Argument.AssertNotDefault(ref blockSize, name: nameof(blockSize)); - Argument.AssertNotDefault(ref expectedLength, name: nameof(expectedLength)); - if (expectedLength < blockSize) - { - Argument.AssertInRange(blockSize, expectedLength, default, nameof(blockSize)); - } - List ranges = new List(); - - for (long offset = 0; offset < expectedLength; offset += blockSize) - { - ranges.Add(new HttpRange(offset, Math.Min(expectedLength - offset, blockSize))); - } - - return ranges; - } - [Test] [TestCase(512)] [TestCase(Constants.KB)] @@ -175,15 +160,9 @@ public void OneChunkTransfer(long blockSize) { // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); - - List ranges = new List() - { - new HttpRange(0, blockSize) - }; using var downloadChunkHandler = new DownloadChunkHandler( currentTransferred: 0, expectedLength: blockSize, - ranges: ranges, new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, @@ -218,11 +197,9 @@ public void MultipleChunkTransfer(long blockSize) // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); long expectedLength = blockSize * 2; - List ranges = GetRanges(blockSize, expectedLength); using var downloadChunkHandler = new DownloadChunkHandler( currentTransferred: 0, expectedLength: expectedLength, - ranges: ranges, behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, @@ -234,7 +211,7 @@ public void MultipleChunkTransfer(long blockSize) PredictableStream content = new PredictableStream(blockSize); - // Act - Make one chunk that would update the bytes but not cause a commit block list to occur + // Act - First chunk downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, length: blockSize, @@ -250,7 +227,7 @@ public void MultipleChunkTransfer(long blockSize) PredictableStream content2 = new PredictableStream(blockSize); - // Act - Now add the last block to meet the required commited block amount. + // Act - Second/final chunk downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: blockSize, length: blockSize, @@ -265,50 +242,6 @@ public void MultipleChunkTransfer(long blockSize) expectedCompleteFileCount: 1); } - [Test] - [TestCase(512)] - [TestCase(Constants.KB)] - public void MultipleChunkTransfer_UnexpectedOffsetError(long blockSize) - { - // Arrange - Set up tasks - MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); - long expectedLength = blockSize * 2; - List ranges = GetRanges(blockSize, expectedLength); - - using var downloadChunkHandler = new DownloadChunkHandler( - currentTransferred: 0, - expectedLength: expectedLength, - ranges: ranges, - behaviors: new DownloadChunkHandler.Behaviors { - CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, - ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, - InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object }, - cancellationToken: CancellationToken.None); - - PredictableStream content = new PredictableStream(blockSize); - - // Make initial range event - downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( - offset: 0, - length: blockSize, - result: content)); - - // Act - Make the repeat at the same offset to cause an error. - downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( - offset: 0, - length: blockSize, - result: content)); - - // Assert - VerifyDelegateInvocations( - behaviors: mockBehaviors, - expectedFailureCount: 1, - expectedCopyDestinationCount: 1, - expectedReportProgressCount: 1, - expectedCompleteFileCount: 0); - } - [Test] [TestCase(512)] [TestCase(Constants.KB)] @@ -317,12 +250,10 @@ public void MultipleChunkTransfer_EarlyChunks(long blockSize) // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); long expectedLength = blockSize * 2; - List ranges = GetRanges(blockSize, expectedLength); using var downloadChunkHandler = new DownloadChunkHandler( currentTransferred: 0, expectedLength: expectedLength, - ranges: ranges, behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, @@ -344,8 +275,8 @@ public void MultipleChunkTransfer_EarlyChunks(long blockSize) VerifyDelegateInvocations( behaviors: mockBehaviors, expectedFailureCount: 0, - expectedCopyDestinationCount: 0, - expectedReportProgressCount: 0, + expectedCopyDestinationCount: 1, + expectedReportProgressCount: 1, expectedCompleteFileCount: 0); // Act - The first chunk is then returned @@ -373,11 +304,10 @@ public async Task MultipleChunkTransfer_MultipleProcesses(long blockSize, int ta // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); long expectedLength = blockSize * taskSize; - List ranges = GetRanges(blockSize, expectedLength); + using var downloadChunkHandler = new DownloadChunkHandler( currentTransferred: 0, expectedLength: expectedLength, - ranges: ranges, behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, @@ -423,12 +353,10 @@ public void GetCopyToDestinationFileTask_ExpectedFailure() mockBehaviors.CopyToDestinationFileTask = GetExceptionCopyToDestinationFileTask(); int blockSize = 512; long expectedLength = blockSize * 2; - List ranges = GetRanges(blockSize, expectedLength); var downloadChunkHandler = new DownloadChunkHandler( currentTransferred: 0, expectedLength: expectedLength, - ranges: ranges, behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, @@ -462,12 +390,10 @@ public void QueueCompleteFileDownloadTask_ExpectedFailure() MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); mockBehaviors.QueueCompleteFileDownloadTask = GetExceptionQueueCompleteFileDownloadTask(); int blockSize = 512; - List ranges = GetRanges(blockSize, blockSize); var downloadChunkHandler = new DownloadChunkHandler( currentTransferred: 0, expectedLength: blockSize, - ranges: ranges, behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, @@ -501,12 +427,10 @@ public void DisposedEventHandler() MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); int blockSize = 512; long expectedLength = blockSize * 2; - List ranges = GetRanges(blockSize, expectedLength); var downloadChunkHandler = new DownloadChunkHandler( currentTransferred: 0, expectedLength: blockSize, - ranges: ranges, behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, From b6535707645c5dd29dd4342b61a7b3eeca216057 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Mon, 18 Nov 2024 15:55:36 -0800 Subject: [PATCH 07/12] Save chunks in memory --- .../src/Shared/ContentRange.cs | 3 +- .../BlockBlobStartTransferDownloadTests.cs | 50 ------------------- .../src/DownloadChunkHandler.cs | 4 +- .../src/QueueDownloadChunkArgs.cs | 6 +-- .../src/UriToStreamJobPart.cs | 13 ++++- .../tests/DownloadChunkHandlerTests.cs | 16 +++--- 6 files changed, 27 insertions(+), 65 deletions(-) diff --git a/sdk/storage/Azure.Storage.Common/src/Shared/ContentRange.cs b/sdk/storage/Azure.Storage.Common/src/Shared/ContentRange.cs index f656382efad2b..35bccf87d76c3 100644 --- a/sdk/storage/Azure.Storage.Common/src/Shared/ContentRange.cs +++ b/sdk/storage/Azure.Storage.Common/src/Shared/ContentRange.cs @@ -153,7 +153,8 @@ public static HttpRange ToHttpRange(ContentRange contentRange) { // Because constructing HttpRange is the start value, and the length of the range // increment 1 on the end value, since the end value is the end index (not the length). - return new HttpRange(contentRange.Start.Value, contentRange.End.Value + 1); + long length = contentRange.End.Value - contentRange.Start.Value + 1; + return new HttpRange(contentRange.Start.Value, length); } return default; } diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobStartTransferDownloadTests.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobStartTransferDownloadTests.cs index ab5cbef8e783a..0e4e4310aeb00 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobStartTransferDownloadTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobStartTransferDownloadTests.cs @@ -13,10 +13,6 @@ using Azure.Storage.Test.Shared; using Azure.Core; using Azure.Core.TestFramework; -using NUnit.Framework; -using BaseBlobs::Azure.Storage.Blobs.Models; -using System.Threading; -using System.Linq; namespace Azure.Storage.DataMovement.Blobs.Tests { @@ -104,51 +100,5 @@ public BlobClientOptions GetOptions() return InstrumentClientOptions(options); } - - [Test] - public async Task DownloadTransferTest() - { - BlobServiceClient service = ClientBuilder.GetServiceClient_OAuth(TestEnvironment.Credential); - BlobContainerClient container = service.GetBlobContainerClient("test-download-1"); - using DisposingLocalDirectory testDirectory = DisposingLocalDirectory.GetTestDirectory(); - - //await container.CreateIfNotExistsAsync(); - //Random random = new(); - //foreach (int i in Enumerable.Range(0, 5)) - //{ - // byte[] data = new byte[1048576]; - // random.NextBytes(data); - // await container.UploadBlobAsync($"blob{i}", new BinaryData(data)); - //} - - BlobsStorageResourceProvider blobProvider = new(); - LocalFilesStorageResourceProvider localProvider = new(); - - TransferManager transferManager = new(); - DataTransferOptions options = new() - { - InitialTransferSize = 4096, - MaximumTransferChunkSize = 4096, - }; - TestEventsRaised testEvents = new(options); - DataTransfer transfer = await transferManager.StartTransferAsync( - blobProvider.FromClient(container), - localProvider.FromDirectory(testDirectory.DirectoryPath), - options); - - CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - await transfer.WaitForCompletionAsync(cts.Token); - testEvents.AssertUnexpectedFailureCheck(); - - await foreach (BlobItem blob in container.GetBlobsAsync()) - { - string localPath = Path.Combine(testDirectory.DirectoryPath, blob.Name); - var response = await container.GetBlobClient(blob.Name).DownloadContentAsync(); - byte[] expected = response.Value.Content.ToArray(); - byte[] actual = File.ReadAllBytes(localPath); - - Assert.That(actual, Is.EqualTo(expected)); - } - } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs index 17886dbff61ec..8e5d2701b5937 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs @@ -122,7 +122,7 @@ private async Task NotifyOfPendingChunkDownloadEvents() QueueDownloadChunkArgs args = await _downloadRangeChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false); // Copy the current chunk to the destination - using (Stream content = args.Result) + using (Stream content = args.Content) { await _copyToDestinationFile( args.Offset, @@ -133,7 +133,7 @@ await _copyToDestinationFile( } UpdateBytesAndRange(args.Length); - //Check if we finished downloading the blob + // Check if we finished downloading the blob if (_bytesTransferred == _expectedLength) { await _queueCompleteFileDownload().ConfigureAwait(false); diff --git a/sdk/storage/Azure.Storage.DataMovement/src/QueueDownloadChunkArgs.cs b/sdk/storage/Azure.Storage.DataMovement/src/QueueDownloadChunkArgs.cs index c77b5006cbf00..b770af22af079 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/QueueDownloadChunkArgs.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/QueueDownloadChunkArgs.cs @@ -9,16 +9,16 @@ internal class QueueDownloadChunkArgs { public long Offset { get; } public long Length { get; } - public Stream Result { get; } + public Stream Content { get; } public QueueDownloadChunkArgs( long offset, long length, - Stream result) + Stream content) { Offset = offset; Length = length; - Result = result; + Content = content; } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index d85630d43f7a2..0687523421abf 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -394,11 +394,22 @@ internal async Task DownloadStreamingInternal(HttpRange range) (long)range.Length, _cancellationToken).ConfigureAwait(false); + // Stream the data from the network before queueing disk IO. + MemoryStream content = new((int)result.ContentLength.Value); + using (Stream dataStream = result.Content) + { + await dataStream.CopyToAsync( + content, + DataMovementConstants.DefaultStreamCopyBufferSize, + _cancellationToken).ConfigureAwait(false); + } + content.Position = 0; + // The chunk handler may have been disposed in failure case _downloadChunkHandler?.QueueChunk(new QueueDownloadChunkArgs( offset: range.Offset, length: (long)range.Length, - result: result.Content)); + content: content)); } catch (Exception ex) { diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs index 4710762d60680..43e79dbbe6c0e 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs @@ -178,7 +178,7 @@ public void OneChunkTransfer(long blockSize) downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, length: blockSize, - result: content)); + content: content)); // Assert VerifyDelegateInvocations( @@ -215,7 +215,7 @@ public void MultipleChunkTransfer(long blockSize) downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, length: blockSize, - result: content)); + content: content)); // Assert VerifyDelegateInvocations( @@ -231,7 +231,7 @@ public void MultipleChunkTransfer(long blockSize) downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: blockSize, length: blockSize, - result: content2)); + content: content2)); // Assert VerifyDelegateInvocations( @@ -269,7 +269,7 @@ public void MultipleChunkTransfer_EarlyChunks(long blockSize) downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: blockSize, length: blockSize, - result: content)); + content: content)); // Assert VerifyDelegateInvocations( @@ -283,7 +283,7 @@ public void MultipleChunkTransfer_EarlyChunks(long blockSize) downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, length: blockSize, - result: content)); + content: content)); // Assert VerifyDelegateInvocations( @@ -328,7 +328,7 @@ public async Task MultipleChunkTransfer_MultipleProcesses(long blockSize, int ta Task task = Task.Run(() => downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: offset, length: blockSize, - result: content))); + content: content))); runningTasks.Add(task); } @@ -372,7 +372,7 @@ public void GetCopyToDestinationFileTask_ExpectedFailure() downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, length: blockSize, - result: content)); + content: content)); // Assert VerifyDelegateInvocations( @@ -409,7 +409,7 @@ public void QueueCompleteFileDownloadTask_ExpectedFailure() downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, length: blockSize, - result: content)); + content: content)); // Assert VerifyDelegateInvocations( From 005398ee70cb277eb9b1ea341fa4a697dea165fe Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Mon, 18 Nov 2024 16:04:17 -0800 Subject: [PATCH 08/12] Export API, new perf test --- sdk/storage/Azure.Storage.DataMovement.Blobs/perf-tests.yml | 1 + .../api/Azure.Storage.DataMovement.net6.0.cs | 1 + .../api/Azure.Storage.DataMovement.netstandard2.0.cs | 1 + 3 files changed, 3 insertions(+) diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/perf-tests.yml b/sdk/storage/Azure.Storage.DataMovement.Blobs/perf-tests.yml index d77d2310bcf3e..d2baad6e9a7e8 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/perf-tests.yml +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/perf-tests.yml @@ -15,6 +15,7 @@ Tests: Arguments: &sizes - --size 1024 --count 5000 --duration 60 --concurrency 64 - --size 10485760 --count 500 --duration 90 --concurrency 64 + - --size 52428800 --count 200 --duration 120 --concurrency 64 - --size 1073741824 --count 5 --duration 120 --concurrency 64 - Test: upload diff --git a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs index 128331d09d4b6..bbeee3c8f9648 100644 --- a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs +++ b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs @@ -220,6 +220,7 @@ public partial class StorageResourceWriteToOffsetOptions { public StorageResourceWriteToOffsetOptions() { } public string BlockId { get { throw null; } set { } } + public bool Initial { get { throw null; } set { } } public long? Position { get { throw null; } set { } } public Azure.Storage.DataMovement.StorageResourceItemProperties SourceProperties { get { throw null; } set { } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs index 128331d09d4b6..bbeee3c8f9648 100644 --- a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs +++ b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs @@ -220,6 +220,7 @@ public partial class StorageResourceWriteToOffsetOptions { public StorageResourceWriteToOffsetOptions() { } public string BlockId { get { throw null; } set { } } + public bool Initial { get { throw null; } set { } } public long? Position { get { throw null; } set { } } public Azure.Storage.DataMovement.StorageResourceItemProperties SourceProperties { get { throw null; } set { } } } From e68e25067dd7292637c30045b157b372c6f1df76 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Mon, 18 Nov 2024 16:15:51 -0800 Subject: [PATCH 09/12] Unused import --- .../tests/BlockBlobDirectoryToDirectoryTests.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobDirectoryToDirectoryTests.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobDirectoryToDirectoryTests.cs index 56d97860365ff..a1771d263f8b3 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobDirectoryToDirectoryTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlockBlobDirectoryToDirectoryTests.cs @@ -11,7 +11,6 @@ using BaseBlobs::Azure.Storage.Blobs.Models; using BaseBlobs::Azure.Storage.Blobs.Specialized; using DMBlobs::Azure.Storage.DataMovement.Blobs; -using NUnit.Framework; namespace Azure.Storage.DataMovement.Blobs.Tests { From a4ea0f06a5d74fc21428bfcfd3f29528ae7f1fb7 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Mon, 18 Nov 2024 17:24:39 -0800 Subject: [PATCH 10/12] Fix for unknown length downloads --- .../Azure.Storage.DataMovement/src/DownloadChunkHandler.cs | 5 +---- .../Azure.Storage.DataMovement/src/UriToStreamJobPart.cs | 1 - 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs index 8e5d2701b5937..0f59fea48999c 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs @@ -42,7 +42,6 @@ public struct Behaviors private long _bytesTransferred; private readonly long _expectedLength; - private int _chunkTransferred; /// /// The controller for downloading the chunks to each file. @@ -70,7 +69,6 @@ public DownloadChunkHandler( // Set bytes transferred to the length of bytes we got back from the initial // download request _bytesTransferred = currentTransferred; - _chunkTransferred = 0; // The size of the channel should never exceed 50k (limit on blocks in a block blob). // and that's in the worst case that we never read from the channel and had a maximum chunk blob. @@ -129,7 +127,7 @@ await _copyToDestinationFile( args.Length, content, _expectedLength, - initial: _chunkTransferred == 0).ConfigureAwait(false); + initial: _bytesTransferred == 0).ConfigureAwait(false); } UpdateBytesAndRange(args.Length); @@ -153,7 +151,6 @@ await _copyToDestinationFile( /// private void UpdateBytesAndRange(long bytesDownloaded) { - _chunkTransferred++; _bytesTransferred += bytesDownloaded; _reportProgressInBytes(bytesDownloaded); } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index 0687523421abf..d159d85781c3f 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -8,7 +8,6 @@ using System.Threading; using Azure.Core; using Azure.Storage.Common; -using System.Linq; namespace Azure.Storage.DataMovement { From 6b54907c196a7cc78232123e71581933cb37d1b8 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Mon, 18 Nov 2024 17:37:50 -0800 Subject: [PATCH 11/12] Re-export API - now with .NET 8 --- .../api/Azure.Storage.DataMovement.net8.0.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net8.0.cs b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net8.0.cs index 6df462c6c90b7..da3c954f78120 100644 --- a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net8.0.cs +++ b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net8.0.cs @@ -220,6 +220,7 @@ public partial class StorageResourceWriteToOffsetOptions { public StorageResourceWriteToOffsetOptions() { } public string BlockId { get { throw null; } set { } } + public bool Initial { get { throw null; } set { } } public long? Position { get { throw null; } set { } } public Azure.Storage.DataMovement.StorageResourceItemProperties SourceProperties { get { throw null; } set { } } } From 2fa4663e45f2e94538ab1801329edc12f2fdd226 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Mon, 18 Nov 2024 18:06:36 -0800 Subject: [PATCH 12/12] Fix tests --- .../tests/LocalFileStorageResourceTests.cs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/LocalFileStorageResourceTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/LocalFileStorageResourceTests.cs index 9460d7fa207cd..2a87e2c9497ba 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/LocalFileStorageResourceTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/LocalFileStorageResourceTests.cs @@ -170,7 +170,8 @@ await storageResource.CopyFromStreamAsync( stream, streamLength: length, false, - completeLength: length); + completeLength: length, + options: new StorageResourceWriteToOffsetOptions() { Initial = true }); } // Assert @@ -200,7 +201,7 @@ await storageResource.CopyFromStreamAsync( streamLength: length, overwrite: false, completeLength: length, - options: new StorageResourceWriteToOffsetOptions() { Position = writePosition }); + options: new StorageResourceWriteToOffsetOptions() { Position = writePosition, Initial = false }); } // Assert @@ -219,7 +220,7 @@ public async Task WriteStreamAsync_Error() string path = await CreateRandomFileAsync(test.DirectoryPath, size: length); LocalFileStorageResource storageResource = new LocalFileStorageResource(path); var data = GetRandomBuffer(length); - try + Assert.ThrowsAsync(async () => { using (var stream = new MemoryStream(data)) { @@ -227,13 +228,11 @@ await storageResource.CopyFromStreamAsync( stream: stream, streamLength: length, overwrite: false, - completeLength: length); + completeLength: length, + options: new StorageResourceWriteToOffsetOptions() { Initial = true }); } - } - catch (IOException ex) - { - Assert.AreEqual(ex.Message, $"File path `{path}` already exists. Cannot overwrite file."); - } + }, + $"File path `{path}` already exists. Cannot overwrite file."); } [Test]