Skip to content

Commit

Permalink
[Storage] [DataMovement] Added exception handling during internal tra…
Browse files Browse the repository at this point in the history
…nsfer (#46968)

* initial commit

* removed file exist check
  • Loading branch information
nickliu-msft authored Nov 5, 2024
1 parent e9e07dd commit 91389b4
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,20 @@ public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
FileName = fileName
};

using (FileStream fileStream = File.Create(result.FileName.ToString()))
try
{
await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false);
using (FileStream fileStream = File.Create(result.FileName.ToString()))
{
await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception)
{
// will handle if file has not been created yet
File.Delete(result.FileName.ToString());
throw;
}

return result;
}

Expand Down
13 changes: 11 additions & 2 deletions sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPlanFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,18 @@ public static async Task<JobPlanFile> CreateJobPlanFileAsync(
string filePath = Path.Combine(checkpointerPath, fileName);

JobPlanFile jobPlanFile = new(id, filePath);
using (FileStream fileStream = File.Create(jobPlanFile.FilePath))
try
{
await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false);
using (FileStream fileStream = File.Create(jobPlanFile.FilePath))
{
await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception)
{
// will handle if file has not been created yet
File.Delete(jobPlanFile.FilePath);
throw;
}

return jobPlanFile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ public override async Task AddNewJobPartAsync(
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
headerStream.Position = 0;

if (!_transferStates.ContainsKey(transferId))
{
// We should never get here because AddNewJobAsync should
// always be called first.
throw Errors.MissingTransferIdAddPartCheckpointer(transferId, partNumber);
}

JobPartPlanFile mappedFile = await JobPartPlanFile.CreateJobPartPlanFileAsync(
_pathToCheckpointer,
transferId,
Expand All @@ -121,16 +128,7 @@ public override async Task AddNewJobPartAsync(
cancellationToken).ConfigureAwait(false);

// Add the job part into the current state
if (_transferStates.ContainsKey(transferId))
{
_transferStates[transferId].JobParts.Add(partNumber, mappedFile);
}
else
{
// We should never get here because AddNewJobAsync should
// always be called first.
throw Errors.MissingTransferIdAddPartCheckpointer(transferId, partNumber);
}
_transferStates[transferId].JobParts.Add(partNumber, mappedFile);
}

public override Task<int> CurrentJobPartCountAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ public static ServiceToServiceJobPart CreateJobPartFromCheckpoint(

public override async Task ProcessPartToChunkAsync()
{
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);

long? fileLength = default;
StorageResourceItemProperties sourceProperties = default;
try
{
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);

long? fileLength = default;
StorageResourceItemProperties sourceProperties = default;
fileLength = _sourceResource.Length;
sourceProperties = await _sourceResource.GetPropertiesAsync(_cancellationToken).ConfigureAwait(false);
await _destinationResource.SetPermissionsAsync(
Expand All @@ -193,60 +193,58 @@ await _destinationResource.SetPermissionsAsync(
_cancellationToken).ConfigureAwait(false);

fileLength = sourceProperties.ResourceLength;
}
catch (Exception ex)
{
// TODO: logging when given the event handler
await InvokeFailedArg(ex).ConfigureAwait(false);
return;
}
if (!fileLength.HasValue)
{
await InvokeFailedArg(Errors.UnableToGetLength()).ConfigureAwait(false);
return;
}
long length = fileLength.Value;
if (!fileLength.HasValue)
{
await InvokeFailedArg(Errors.UnableToGetLength()).ConfigureAwait(false);
return;
}
long length = fileLength.Value;

// Perform a single copy operation
if (_initialTransferSize >= length)
{
await QueueChunkToChannelAsync(
async () =>
await StartSingleCallCopy(length).ConfigureAwait(false))
.ConfigureAwait(false);
return;
}
// Perform a single copy operation
if (_initialTransferSize >= length)
{
await QueueChunkToChannelAsync(
async () =>
await StartSingleCallCopy(length).ConfigureAwait(false))
.ConfigureAwait(false);
return;
}

// Perform a series of chunk copies followed by a commit
long blockSize = _transferChunkSize;

_commitBlockHandler = GetCommitController(
expectedLength: length,
blockSize: blockSize,
this,
_destinationResource.TransferType,
sourceProperties);
// If we cannot upload in one shot, initiate the parallel block uploader
if (await CreateDestinationResource(length, blockSize).ConfigureAwait(false))
{
List<(long Offset, long Length)> commitBlockList = GetRangeList(blockSize, length);
if (_destinationResource.TransferType == DataTransferOrder.Unordered)
// Perform a series of chunk copies followed by a commit
long blockSize = _transferChunkSize;

_commitBlockHandler = GetCommitController(
expectedLength: length,
blockSize: blockSize,
this,
_destinationResource.TransferType,
sourceProperties);
// If we cannot upload in one shot, initiate the parallel block uploader
if (await CreateDestinationResource(length, blockSize).ConfigureAwait(false))
{
await QueueStageBlockRequests(commitBlockList, length, sourceProperties).ConfigureAwait(false);
List<(long Offset, long Length)> commitBlockList = GetRangeList(blockSize, length);
if (_destinationResource.TransferType == DataTransferOrder.Unordered)
{
await QueueStageBlockRequests(commitBlockList, length, sourceProperties).ConfigureAwait(false);
}
else // Sequential
{
// Queue the first partitioned block task
await QueueStageBlockRequest(
commitBlockList[0].Offset,
commitBlockList[0].Length,
length,
sourceProperties).ConfigureAwait(false);
}
}
else // Sequential
else
{
// Queue the first partitioned block task
await QueueStageBlockRequest(
commitBlockList[0].Offset,
commitBlockList[0].Length,
length,
sourceProperties).ConfigureAwait(false);
await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false);
}
}
else
catch (Exception ex)
{
await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false);
await InvokeFailedArg(ex).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ public override async Task ProcessPartToChunkAsync()
// Attempt to get the length, it's possible the file could
// not be accessible (or does not exist).
string operationName = $"{nameof(TransferManager.StartTransferAsync)}";
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);
long? fileLength = default;
try
{
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);
long? fileLength = default;
StorageResourceItemProperties properties = await _sourceResource.GetPropertiesAsync(_cancellationToken).ConfigureAwait(false);
fileLength = properties.ResourceLength;

Expand Down
34 changes: 30 additions & 4 deletions sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,15 @@ public void DisposeHandlers()
/// <returns>An IEnumerable that contains the job parts</returns>
public virtual async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync()
{
await OnJobStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);
try
{
await OnJobStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}
int partNumber = 0;

if (_jobParts.Count == 0)
Expand Down Expand Up @@ -324,7 +332,18 @@ public virtual async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync(
}
}

if (!await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false))
bool isEnumerationComplete;
try
{
isEnumerationComplete = await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}

if (!isEnumerationComplete)
{
await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false))
{
Expand All @@ -333,8 +352,15 @@ public virtual async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync(
}
}

// Call regardless of the outcome of enumeration so job can pause/finish
await OnEnumerationComplete().ConfigureAwait(false);
try
{
// Call regardless of the outcome of enumeration so job can pause/finish
await OnEnumerationComplete().ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}

private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,9 @@ public override async Task ProcessPartToChunkAsync()
{
// we can default the length to 0 because we know the destination is local and
// does not require a length to be created.
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);

try
{
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);
if (!_sourceResource.Length.HasValue)
{
await UnknownDownloadInternal().ConfigureAwait(false);
Expand Down

0 comments on commit 91389b4

Please sign in to comment.