Skip to content

Commit

Permalink
Checkpointer testing cont. (#47125)
Browse files Browse the repository at this point in the history
* change checkpointer API

Accept header object instead of stream

* move serialze closer to checkpoint write

* error report bug fix

* adapt tests to new interface

* exportapi
  • Loading branch information
jaschrep-msft authored Nov 13, 2024
1 parent b8a7f8a commit 748c9c4
Show file tree
Hide file tree
Showing 14 changed files with 308 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<Compile Include="$(AzureStorageDataMovementTestSharedSources)TestEventsRaised.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)TestTransferWithTimeout.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)DisposingLocalDirectory.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)MemoryTransferCheckpointer.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)MockQueueInternalTasks.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)MockResourceCheckpointData.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)StartTransferUploadTestBase.cs" LinkBase="Shared\DataMovement" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ public partial class TransferItemCompletedEventArgs : Azure.Storage.DataMovement
}
public partial class TransferItemFailedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs
{
public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResourceItem sourceResource, Azure.Storage.DataMovement.StorageResourceItem destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { }
public Azure.Storage.DataMovement.StorageResourceItem DestinationResource { get { throw null; } }
public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResource sourceResource, Azure.Storage.DataMovement.StorageResource destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { }
public Azure.Storage.DataMovement.StorageResource DestinationResource { get { throw null; } }
public System.Exception Exception { get { throw null; } }
public Azure.Storage.DataMovement.StorageResourceItem SourceResource { get { throw null; } }
public Azure.Storage.DataMovement.StorageResource SourceResource { get { throw null; } }
}
public partial class TransferItemSkippedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ public partial class TransferItemCompletedEventArgs : Azure.Storage.DataMovement
}
public partial class TransferItemFailedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs
{
public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResourceItem sourceResource, Azure.Storage.DataMovement.StorageResourceItem destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { }
public Azure.Storage.DataMovement.StorageResourceItem DestinationResource { get { throw null; } }
public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResource sourceResource, Azure.Storage.DataMovement.StorageResource destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { }
public Azure.Storage.DataMovement.StorageResource DestinationResource { get { throw null; } }
public System.Exception Exception { get { throw null; } }
public Azure.Storage.DataMovement.StorageResourceItem SourceResource { get { throw null; } }
public Azure.Storage.DataMovement.StorageResource SourceResource { get { throw null; } }
}
public partial class TransferItemSkippedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public Task AddNewJobAsync(string transferId, StorageResource source, StorageRes
return Task.CompletedTask;
}

public Task AddNewJobPartAsync(string transferId, int partNumber, Stream headerStream, CancellationToken cancellationToken = default)
public Task AddNewJobPartAsync(string transferId, int partNumber, JobPartPlanHeader header, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Task AddNewJobAsync(
Task AddNewJobPartAsync(
string transferId,
int partNumber,
Stream headerStream,
JobPartPlanHeader header,
CancellationToken cancellationToken = default);

Task<bool> IsEnumerationCompleteAsync(
Expand Down
15 changes: 5 additions & 10 deletions sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -473,16 +473,11 @@ public async virtual Task CleanupAbortedJobPartAsync()
/// </summary>
public async virtual Task AddJobPartToCheckpointerAsync()
{
JobPartPlanHeader header = this.ToJobPartPlanHeader();
using (Stream stream = new MemoryStream())
{
header.Serialize(stream);
await _checkpointer.AddNewJobPartAsync(
transferId: _dataTransfer.Id,
partNumber: PartNumber,
headerStream: stream,
cancellationToken: _cancellationToken).ConfigureAwait(false);
}
await _checkpointer.AddNewJobPartAsync(
transferId: _dataTransfer.Id,
partNumber: PartNumber,
header: this.ToJobPartPlanHeader(),
cancellationToken: _cancellationToken).ConfigureAwait(false);
}

internal async virtual Task SetCheckpointerStatusAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
string checkpointerPath,
string id,
int jobPart,
Stream headerStream,
JobPartPlanHeader header,
CancellationToken cancellationToken = default)
{
Argument.AssertNotNullOrEmpty(checkpointerPath, nameof(checkpointerPath));
Argument.AssertNotNullOrEmpty(id, nameof(id));
Argument.AssertNotNull(jobPart, nameof(jobPart));
Argument.AssertNotNull(headerStream, nameof(headerStream));
Argument.AssertNotNull(header, nameof(header));

JobPartPlanFileName fileName = new JobPartPlanFileName(checkpointerPath: checkpointerPath, id: id, jobPartNumber: jobPart);
return await CreateJobPartPlanFileAsync(fileName, headerStream, cancellationToken).ConfigureAwait(false);
return await CreateJobPartPlanFileAsync(fileName, header, cancellationToken).ConfigureAwait(false);
}

public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
JobPartPlanFileName fileName,
Stream headerStream,
JobPartPlanHeader header,
CancellationToken cancellationToken = default)
{
JobPartPlanFile result = new JobPartPlanFile()
Expand All @@ -63,8 +63,11 @@ public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
try
{
using (FileStream fileStream = File.Create(result.FileName.ToString()))
using (MemoryStream ms = new())
{
await headerStream.CopyToAsync(
header.Serialize(ms);
ms.Position = 0;
await ms.CopyToAsync(
fileStream,
DataMovementConstants.DefaultStreamCopyBufferSize,
cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,13 @@ public override async Task AddNewJobAsync(
public override async Task AddNewJobPartAsync(
string transferId,
int partNumber,
Stream headerStream,
JobPartPlanHeader header,
CancellationToken cancellationToken = default)
{
Argument.AssertNotNullOrEmpty(transferId, nameof(transferId));
Argument.AssertNotNull(partNumber, nameof(partNumber));
Argument.AssertNotNull(headerStream, nameof(headerStream));
Argument.AssertNotNull(header, nameof(header));
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
headerStream.Position = 0;

if (!_transferStates.ContainsKey(transferId))
{
Expand All @@ -124,7 +123,7 @@ public override async Task AddNewJobPartAsync(
_pathToCheckpointer,
transferId,
partNumber,
headerStream,
header,
cancellationToken).ConfigureAwait(false);

// Add the job part into the current state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ public abstract Task AddNewJobAsync(
/// </summary>
/// <param name="transferId">The transfer ID.</param>
/// <param name="partNumber">The job part number.</param>
/// <param name="headerStream">A <see cref="Stream"/> to the job part plan header.</param>
/// <param name="header">A <see cref="Stream"/> to the job part plan header.</param>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/> to propagate
/// notifications that the operation should be canceled.
/// </param>
public abstract Task AddNewJobPartAsync(
string transferId,
int partNumber,
Stream headerStream,
JobPartPlanHeader header,
CancellationToken cancellationToken = default);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ namespace Azure.Storage.DataMovement
public class TransferItemFailedEventArgs : DataTransferEventArgs
{
/// <summary>
/// Gets the <see cref="StorageResourceItem"/> that was the source resource for the transfer.
/// Gets the <see cref="StorageResource"/> that was the source resource for the transfer.
/// </summary>
public StorageResourceItem SourceResource { get; }
public StorageResource SourceResource { get; }

/// <summary>
/// Gets the <see cref="StorageResourceItem"/> that was the destination resource for the transfer.
/// Gets the <see cref="StorageResource"/> that was the destination resource for the transfer.
/// </summary>
public StorageResourceItem DestinationResource { get; }
public StorageResource DestinationResource { get; }

/// <summary>
/// Gets the <see cref="Exception"/> that was thrown during the job.
Expand Down Expand Up @@ -53,8 +53,8 @@ public class TransferItemFailedEventArgs : DataTransferEventArgs
/// </exception>
public TransferItemFailedEventArgs(
string transferId,
StorageResourceItem sourceResource,
StorageResourceItem destinationResource,
StorageResource sourceResource,
StorageResource destinationResource,
Exception exception,
bool isRunningSynchronously,
CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,8 @@ ex is not TaskCanceledException &&
await TransferFailedEventHandler.RaiseAsync(
new TransferItemFailedEventArgs(
_dataTransfer.Id,
_sourceResource,
_destinationResource,
(StorageResource)_sourceResource ?? _sourceResourceContainer,
(StorageResource)_destinationResource ?? _destinationResourceContainer,
ex,
false,
_cancellationToken),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,20 @@ public async Task CreateJobPartPlanFileAsync_Base()
checkpointerPath: test.DirectoryPath,
id: transferId,
jobPart: jobPart,
headerStream: stream);
header: new(
DataMovementConstants.JobPartPlanFile.SchemaVersion_b3,
transferId,
jobPart,
System.DateTimeOffset.Now,
"mock",
"mock",
"mock",
"mock",
default,
default,
default,
default,
new()));
}

JobPartPlanFileName fileName = new JobPartPlanFileName(
Expand Down Expand Up @@ -63,7 +76,20 @@ public async Task CreateJobPartPlanFileAsync_FileName()
{
file = await JobPartPlanFile.CreateJobPartPlanFileAsync(
fileName: fileName,
headerStream: stream);
header: new(
DataMovementConstants.JobPartPlanFile.SchemaVersion_b3,
transferId,
jobPart,
System.DateTimeOffset.Now,
"mock",
"mock",
"mock",
"mock",
default,
default,
default,
default,
new()));
}

Assert.NotNull(file);
Expand Down
Loading

0 comments on commit 748c9c4

Please sign in to comment.