Skip to content

Commit

Permalink
[Storage][DataMovement] Fixes to pause/resume around enumeration (Azu…
Browse files Browse the repository at this point in the history
  • Loading branch information
jalauzon-msft authored and matthohn-msft committed Oct 27, 2023
1 parent 53d9906 commit 56a8cf8
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
job: this,
partNumber: partNumber).ConfigureAwait(false);
AppendJobPart(part);
await OnAllResourcesEnumerated().ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -122,6 +123,7 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
}
}

// Call regardless of the outcome of enumeration so job can pause/finish
await OnEnumerationComplete().ConfigureAwait(false);
}

Expand All @@ -130,7 +132,7 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
// Start the partNumber based on the last part number. If this is a new job,
// the count will automatically be at 0 (the beginning).
int partNumber = _jobParts.Count;
List<string> existingSources = GetJobPartSourceResourcePaths();
HashSet<Uri> existingSources = GetJobPartSourceResourcePaths();
// Call listing operation on the source container
IAsyncEnumerator<StorageResource> enumerator;

Expand All @@ -154,8 +156,10 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
{
try
{
_cancellationToken.ThrowIfCancellationRequested();
if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
{
await OnAllResourcesEnumerated().ConfigureAwait(false);
enumerationCompleted = true;
continue;
}
Expand All @@ -167,17 +171,13 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
}

StorageResource current = enumerator.Current;

string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

if (!existingSources.Contains(sourceName))
if (!existingSources.Contains(current.Uri))
{
// Because AsyncEnumerable doesn't let us know which storage resource is the last resource
// we only yield return when we know this is not the last storage resource to be listed
// from the container.
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

ServiceToServiceJobPart part;
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
job: this,
partNumber: partNumber).ConfigureAwait(false);
AppendJobPart(part);
await OnAllResourcesEnumerated().ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -120,6 +121,7 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
}
}

// Call regardless of the outcome of enumeration so job can pause/finish
await OnEnumerationComplete().ConfigureAwait(false);
}

Expand All @@ -128,7 +130,7 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
// Start the partNumber based on the last part number. If this is a new job,
// the count will automatically be at 0 (the beginning).
int partNumber = _jobParts.Count;
List<string> existingSources = GetJobPartSourceResourcePaths();
HashSet<Uri> existingSources = GetJobPartSourceResourcePaths();
// Call listing operation on the source container
IAsyncEnumerator<StorageResource> enumerator;

Expand All @@ -152,8 +154,10 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
{
try
{
_cancellationToken.ThrowIfCancellationRequested();
if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
{
await OnAllResourcesEnumerated().ConfigureAwait(false);
enumerationCompleted = true;
continue;
}
Expand All @@ -165,17 +169,13 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
}

StorageResource current = enumerator.Current;

string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

if (!existingSources.Contains(sourceName))
if (!existingSources.Contains(current.Uri))
{
// Because AsyncEnumerable doesn't let us know which storage resource is the last resource
// we only yield return when we know this is not the last storage resource to be listed
// from the container.
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

StreamToUriJobPart part;
try
{
Expand Down
19 changes: 15 additions & 4 deletions sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,13 @@ await _checkpointer.SetJobTransferStatusAsync(
status: _dataTransfer.TransferStatus).ConfigureAwait(false);
}

internal async Task OnEnumerationComplete()
/// <summary>
/// Called when enumeration is complete whether it finished successfully, failed, or was paused.
/// All resources may or may not have been enumerated.
/// </summary>
protected async Task OnEnumerationComplete()
{
_enumerationComplete = true;
await _checkpointer.OnEnumerationCompleteAsync(_dataTransfer.Id).ConfigureAwait(false);

// If there were no job parts enumerated and we haven't already aborted/completed the job.
if (_jobParts.Count == 0 &&
Expand All @@ -426,6 +429,14 @@ internal async Task OnEnumerationComplete()
await CheckAndUpdateStatusAsync().ConfigureAwait(false);
}

/// <summary>
/// Called when all resources have been enumerated successfully.
/// </summary>
protected async Task OnAllResourcesEnumerated()
{
await _checkpointer.OnEnumerationCompleteAsync(_dataTransfer.Id).ConfigureAwait(false);
}

internal async Task CheckAndUpdateStatusAsync()
{
// If we had a failure or pause during listing, we need to set the status correctly.
Expand Down Expand Up @@ -469,9 +480,9 @@ public void AppendJobPart(JobPartInternal jobPart)
}
}

internal List<string> GetJobPartSourceResourcePaths()
internal HashSet<Uri> GetJobPartSourceResourcePaths()
{
return _jobParts.Select( x => x._sourceResource.Uri.GetPath() ).ToList();
return new HashSet<Uri>(_jobParts.Select(x => x._sourceResource.Uri));
}

internal void QueueJobPart()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
job: this,
partNumber: partNumber).ConfigureAwait(false);
AppendJobPart(part);
await OnAllResourcesEnumerated().ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -120,6 +121,7 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
}
}

// Call regardless of the outcome of enumeration so job can pause/finish
await OnEnumerationComplete().ConfigureAwait(false);
}

Expand All @@ -128,7 +130,7 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
// Start the partNumber based on the last part number. If this is a new job,
// the count will automatically be at 0 (the beginning).
int partNumber = _jobParts.Count;
List<string> existingSources = GetJobPartSourceResourcePaths();
HashSet<Uri> existingSources = GetJobPartSourceResourcePaths();
// Call listing operation on the source container
IAsyncEnumerator<StorageResource> enumerator;

Expand All @@ -152,8 +154,10 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
{
try
{
_cancellationToken.ThrowIfCancellationRequested();
if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
{
await OnAllResourcesEnumerated().ConfigureAwait(false);
enumerationCompleted = true;
continue;
}
Expand All @@ -165,17 +169,13 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
}

StorageResource current = enumerator.Current;

string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

if (!existingSources.Contains(sourceName))
if (!existingSources.Contains(current.Uri))
{
// Because AsyncEnumerable doesn't let us know which storage resource is the last resource
// we only yield return when we know this is not the last storage resource to be listed
// from the container.
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

UriToStreamJobPart part;
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,79 @@ await AssertDirectorySourceAndDestinationAsync(
destinationContainer: destinationContainer.Container);
}

[Ignore("Likely to fail in pipelines and takes a while to run.")]
[Test, Pairwise]
[LiveOnly]
public async Task ResumeTransferAsync_Directory_Large(
[Values(TransferDirection.Upload, TransferDirection.Download, TransferDirection.Copy)] TransferDirection transferType,
[Values(100)] int blobCount,
[Values(0, 500, 2000)] int delayInMs)
{
// This test is not really meant to run in a pipeline and may fail locally
// depending on timing. Its more meant as a starting place to attempt testing
// pause/resume in different states of the transfer. You may also find adding
// delays in certain parts of the code while testing can help get more
// consistent results.

// Arrange
using DisposingLocalDirectory checkpointerDirectory = DisposingLocalDirectory.GetTestDirectory();
using DisposingLocalDirectory sourceDirectory = DisposingLocalDirectory.GetTestDirectory();
using DisposingLocalDirectory destinationDirectory = DisposingLocalDirectory.GetTestDirectory();
await using DisposingContainer sourceContainer = await GetTestContainerAsync(publicAccessType: PublicAccessType.BlobContainer);
await using DisposingContainer destinationContainer = await GetTestContainerAsync();

BlobsStorageResourceProvider blobProvider = new(GetSharedKeyCredential());
LocalFilesStorageResourceProvider localProvider = new();
TransferManagerOptions options = new TransferManagerOptions()
{
CheckpointerOptions = new TransferCheckpointStoreOptions(checkpointerDirectory.DirectoryPath),
ErrorHandling = DataTransferErrorMode.ContinueOnFailure,
ResumeProviders = new() { blobProvider, localProvider },
};
TransferManager transferManager = new TransferManager(options);
long size = Constants.MB;

(StorageResource sResource, StorageResource dResource) = await CreateStorageResourceContainersAsync(
transferType: transferType,
size: size,
transferCount: blobCount,
sourceDirectoryPath: sourceDirectory.DirectoryPath,
destinationDirectoryPath: destinationDirectory.DirectoryPath,
sourceContainer: sourceContainer.Container,
destinationContainer: destinationContainer.Container,
blobProvider: blobProvider,
localProvider: localProvider);

// Start transfer
DataTransfer transfer = await transferManager.StartTransferAsync(sResource, dResource);

// Sleep before pausing
await Task.Delay(delayInMs);

// Pause Transfer
CancellationTokenSource pauseCancellation = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await transferManager.PauseTransferIfRunningAsync(transfer.Id, pauseCancellation.Token);
Assert.AreEqual(DataTransferState.Paused, transfer.TransferStatus.State);

// Resume Transfer
DataTransfer resumeTransfer = await transferManager.ResumeTransferAsync(transfer.Id);

CancellationTokenSource waitTransferCompletion = new CancellationTokenSource(TimeSpan.FromSeconds(600));
await resumeTransfer.WaitForCompletionAsync(waitTransferCompletion.Token);

// Assert
Assert.AreEqual(DataTransferState.Completed, resumeTransfer.TransferStatus.State);
Assert.IsTrue(resumeTransfer.HasCompleted);

// Verify transfer
await AssertDirectorySourceAndDestinationAsync(
transferType: transferType,
sourceResource: sResource as StorageResourceContainer,
destinationResource: dResource as StorageResourceContainer,
sourceContainer: sourceContainer.Container,
destinationContainer: destinationContainer.Container);
}

[Test]
public async Task PauseAllTriggersCorrectPauses()
{
Expand Down

0 comments on commit 56a8cf8

Please sign in to comment.