diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/Azure.Storage.DataMovement.Blobs.Tests.csproj b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/Azure.Storage.DataMovement.Blobs.Tests.csproj index 43e0c3ca3d748..83e1264a0290a 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/Azure.Storage.DataMovement.Blobs.Tests.csproj +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/Azure.Storage.DataMovement.Blobs.Tests.csproj @@ -41,6 +41,7 @@ + diff --git a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs index 128331d09d4b6..6df462c6c90b7 100644 --- a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs +++ b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs @@ -237,10 +237,10 @@ public partial class TransferItemCompletedEventArgs : Azure.Storage.DataMovement } public partial class TransferItemFailedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs { - public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResourceItem sourceResource, Azure.Storage.DataMovement.StorageResourceItem destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { } - public Azure.Storage.DataMovement.StorageResourceItem DestinationResource { get { throw null; } } + public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResource sourceResource, Azure.Storage.DataMovement.StorageResource destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { } + public Azure.Storage.DataMovement.StorageResource DestinationResource { get { throw null; } } public System.Exception Exception { get { throw null; } } - public Azure.Storage.DataMovement.StorageResourceItem SourceResource { get { throw null; } } + public Azure.Storage.DataMovement.StorageResource SourceResource { get { throw null; } } } public partial class TransferItemSkippedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs { diff --git a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs index 128331d09d4b6..6df462c6c90b7 100644 --- a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs +++ b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs @@ -237,10 +237,10 @@ public partial class TransferItemCompletedEventArgs : Azure.Storage.DataMovement } public partial class TransferItemFailedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs { - public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResourceItem sourceResource, Azure.Storage.DataMovement.StorageResourceItem destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { } - public Azure.Storage.DataMovement.StorageResourceItem DestinationResource { get { throw null; } } + public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResource sourceResource, Azure.Storage.DataMovement.StorageResource destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { } + public Azure.Storage.DataMovement.StorageResource DestinationResource { get { throw null; } } public System.Exception Exception { get { throw null; } } - public Azure.Storage.DataMovement.StorageResourceItem SourceResource { get { throw null; } } + public Azure.Storage.DataMovement.StorageResource SourceResource { get { throw null; } } } public partial class TransferItemSkippedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs { diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DisabledTransferCheckpointer.cs b/sdk/storage/Azure.Storage.DataMovement/src/DisabledTransferCheckpointer.cs index 6bacc6865137a..507321d5e018d 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DisabledTransferCheckpointer.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DisabledTransferCheckpointer.cs @@ -16,7 +16,7 @@ public Task AddNewJobAsync(string transferId, StorageResource source, StorageRes return Task.CompletedTask; } - public Task AddNewJobPartAsync(string transferId, int partNumber, Stream headerStream, CancellationToken cancellationToken = default) + public Task AddNewJobPartAsync(string transferId, int partNumber, JobPartPlanHeader header, CancellationToken cancellationToken = default) { return Task.CompletedTask; } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ITransferCheckpointer.cs b/sdk/storage/Azure.Storage.DataMovement/src/ITransferCheckpointer.cs index 24705fb4ec6f1..2842ba58caf5c 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/ITransferCheckpointer.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/ITransferCheckpointer.cs @@ -20,7 +20,7 @@ Task AddNewJobAsync( Task AddNewJobPartAsync( string transferId, int partNumber, - Stream headerStream, + JobPartPlanHeader header, CancellationToken cancellationToken = default); Task IsEnumerationCompleteAsync( diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs index 637d5baf5b24d..d3b74e0a0ef0e 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs @@ -473,16 +473,11 @@ public async virtual Task CleanupAbortedJobPartAsync() /// public async virtual Task AddJobPartToCheckpointerAsync() { - JobPartPlanHeader header = this.ToJobPartPlanHeader(); - using (Stream stream = new MemoryStream()) - { - header.Serialize(stream); - await _checkpointer.AddNewJobPartAsync( - transferId: _dataTransfer.Id, - partNumber: PartNumber, - headerStream: stream, - cancellationToken: _cancellationToken).ConfigureAwait(false); - } + await _checkpointer.AddNewJobPartAsync( + transferId: _dataTransfer.Id, + partNumber: PartNumber, + header: this.ToJobPartPlanHeader(), + cancellationToken: _cancellationToken).ConfigureAwait(false); } internal async virtual Task SetCheckpointerStatusAsync() diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanFile.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanFile.cs index 55eb6d9854aca..4db37b38ac5b4 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanFile.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanFile.cs @@ -38,21 +38,21 @@ public static async Task CreateJobPartPlanFileAsync( string checkpointerPath, string id, int jobPart, - Stream headerStream, + JobPartPlanHeader header, CancellationToken cancellationToken = default) { Argument.AssertNotNullOrEmpty(checkpointerPath, nameof(checkpointerPath)); Argument.AssertNotNullOrEmpty(id, nameof(id)); Argument.AssertNotNull(jobPart, nameof(jobPart)); - Argument.AssertNotNull(headerStream, nameof(headerStream)); + Argument.AssertNotNull(header, nameof(header)); JobPartPlanFileName fileName = new JobPartPlanFileName(checkpointerPath: checkpointerPath, id: id, jobPartNumber: jobPart); - return await CreateJobPartPlanFileAsync(fileName, headerStream, cancellationToken).ConfigureAwait(false); + return await CreateJobPartPlanFileAsync(fileName, header, cancellationToken).ConfigureAwait(false); } public static async Task CreateJobPartPlanFileAsync( JobPartPlanFileName fileName, - Stream headerStream, + JobPartPlanHeader header, CancellationToken cancellationToken = default) { JobPartPlanFile result = new JobPartPlanFile() @@ -63,8 +63,11 @@ public static async Task CreateJobPartPlanFileAsync( try { using (FileStream fileStream = File.Create(result.FileName.ToString())) + using (MemoryStream ms = new()) { - await headerStream.CopyToAsync( + header.Serialize(ms); + ms.Position = 0; + await ms.CopyToAsync( fileStream, DataMovementConstants.DefaultStreamCopyBufferSize, cancellationToken).ConfigureAwait(false); diff --git a/sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs b/sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs index 7034c19ebb414..5fc8ea75b616f 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs @@ -104,14 +104,13 @@ public override async Task AddNewJobAsync( public override async Task AddNewJobPartAsync( string transferId, int partNumber, - Stream headerStream, + JobPartPlanHeader header, CancellationToken cancellationToken = default) { Argument.AssertNotNullOrEmpty(transferId, nameof(transferId)); Argument.AssertNotNull(partNumber, nameof(partNumber)); - Argument.AssertNotNull(headerStream, nameof(headerStream)); + Argument.AssertNotNull(header, nameof(header)); CancellationHelper.ThrowIfCancellationRequested(cancellationToken); - headerStream.Position = 0; if (!_transferStates.ContainsKey(transferId)) { @@ -124,7 +123,7 @@ public override async Task AddNewJobPartAsync( _pathToCheckpointer, transferId, partNumber, - headerStream, + header, cancellationToken).ConfigureAwait(false); // Add the job part into the current state diff --git a/sdk/storage/Azure.Storage.DataMovement/src/SerializerTransferCheckpointer.cs b/sdk/storage/Azure.Storage.DataMovement/src/SerializerTransferCheckpointer.cs index e43597d5d543a..323aff4e6c0ea 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/SerializerTransferCheckpointer.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/SerializerTransferCheckpointer.cs @@ -45,7 +45,7 @@ public abstract Task AddNewJobAsync( /// /// The transfer ID. /// The job part number. - /// A to the job part plan header. + /// A to the job part plan header. /// /// Optional to propagate /// notifications that the operation should be canceled. @@ -53,7 +53,7 @@ public abstract Task AddNewJobAsync( public abstract Task AddNewJobPartAsync( string transferId, int partNumber, - Stream headerStream, + JobPartPlanHeader header, CancellationToken cancellationToken = default); /// diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferItemFailedEventArgs.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferItemFailedEventArgs.cs index 918e90e8583a5..bec7dc174d045 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/TransferItemFailedEventArgs.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferItemFailedEventArgs.cs @@ -13,14 +13,14 @@ namespace Azure.Storage.DataMovement public class TransferItemFailedEventArgs : DataTransferEventArgs { /// - /// Gets the that was the source resource for the transfer. + /// Gets the that was the source resource for the transfer. /// - public StorageResourceItem SourceResource { get; } + public StorageResource SourceResource { get; } /// - /// Gets the that was the destination resource for the transfer. + /// Gets the that was the destination resource for the transfer. /// - public StorageResourceItem DestinationResource { get; } + public StorageResource DestinationResource { get; } /// /// Gets the that was thrown during the job. @@ -53,8 +53,8 @@ public class TransferItemFailedEventArgs : DataTransferEventArgs /// public TransferItemFailedEventArgs( string transferId, - StorageResourceItem sourceResource, - StorageResourceItem destinationResource, + StorageResource sourceResource, + StorageResource destinationResource, Exception exception, bool isRunningSynchronously, CancellationToken cancellationToken) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs index 1f2339b1c05a6..6938d460072d3 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs @@ -493,8 +493,8 @@ ex is not TaskCanceledException && await TransferFailedEventHandler.RaiseAsync( new TransferItemFailedEventArgs( _dataTransfer.Id, - _sourceResource, - _destinationResource, + (StorageResource)_sourceResource ?? _sourceResourceContainer, + (StorageResource)_destinationResource ?? _destinationResourceContainer, ex, false, _cancellationToken), diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/JobPartPlanFileTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/JobPartPlanFileTests.cs index e687266f7cb6d..466a82c0afe1d 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/JobPartPlanFileTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/JobPartPlanFileTests.cs @@ -31,7 +31,20 @@ public async Task CreateJobPartPlanFileAsync_Base() checkpointerPath: test.DirectoryPath, id: transferId, jobPart: jobPart, - headerStream: stream); + header: new( + DataMovementConstants.JobPartPlanFile.SchemaVersion_b3, + transferId, + jobPart, + System.DateTimeOffset.Now, + "mock", + "mock", + "mock", + "mock", + default, + default, + default, + default, + new())); } JobPartPlanFileName fileName = new JobPartPlanFileName( @@ -63,7 +76,20 @@ public async Task CreateJobPartPlanFileAsync_FileName() { file = await JobPartPlanFile.CreateJobPartPlanFileAsync( fileName: fileName, - headerStream: stream); + header: new( + DataMovementConstants.JobPartPlanFile.SchemaVersion_b3, + transferId, + jobPart, + System.DateTimeOffset.Now, + "mock", + "mock", + "mock", + "mock", + default, + default, + default, + default, + new())); } Assert.NotNull(file); diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerTests.cs index 6b9d669ff2321..92dd42d40809a 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerTests.cs @@ -205,16 +205,11 @@ public async Task AddNewJobPartAsync() await AddJobToCheckpointer(transferCheckpointer, transferId); - using (Stream stream = new MemoryStream()) - { - header.Serialize(stream); - - // Act - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: partNumber, - headerStream: stream); - } + // Act + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: partNumber, + header: header); // Assert List transferIds = await transferCheckpointer.GetStoredTransfersAsync(); @@ -242,22 +237,17 @@ public async Task AddNewJobPartAsync_Error() await AddJobToCheckpointer(transferCheckpointer, transferId); - using (Stream stream = new MemoryStream()) - { - header.Serialize(stream); - - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: partNumber, - headerStream: stream); + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: partNumber, + header: header); - // Add the same job part twice - Assert.CatchAsync( - async () => await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: partNumber, - headerStream: stream)); - } + // Add the same job part twice + Assert.CatchAsync( + async () => await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: partNumber, + header: header)); // Assert List transferIds = await transferCheckpointer.GetStoredTransfersAsync(); @@ -292,42 +282,22 @@ public async Task AddNewJobPartAsync_MultipleParts() await AddJobToCheckpointer(transferCheckpointer, transferId); - using (Stream stream = new MemoryStream()) - { - header1.Serialize(stream); - - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: 0, - headerStream: stream); - } - using (Stream stream = new MemoryStream()) - { - header2.Serialize(stream); - - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: 1, - headerStream: stream); - } - using (Stream stream = new MemoryStream()) - { - header3.Serialize(stream); - - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: 2, - headerStream: stream); - } - using (Stream stream = new MemoryStream()) - { - header4.Serialize(stream); - - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: 3, - headerStream: stream); - } + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: 0, + header: header1); + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: 1, + header: header2); + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: 2, + header: header3); + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: 3, + header: header4); // Assert List transferIds = await transferCheckpointer.GetStoredTransfersAsync(); @@ -353,26 +323,16 @@ public async Task AddNewJobPartAsync_AddAfterRemove() await AddJobToCheckpointer(transferCheckpointer, transferId); - using (Stream stream = new MemoryStream()) - { - header.Serialize(stream); - - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: 1, - headerStream: stream); - } + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: 1, + header: header); await transferCheckpointer.TryRemoveStoredTransferAsync(transferId); await AddJobToCheckpointer(transferCheckpointer, transferId); - using (Stream stream = new MemoryStream()) - { - header.Serialize(stream); - - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: 1, - headerStream: stream); - } + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: 1, + header: header); // Assert List transferIds = await transferCheckpointer.GetStoredTransfersAsync(); @@ -560,15 +520,10 @@ public async Task CurrentJobPartCountAsync_OneJob() await AddJobToCheckpointer(transferCheckpointer, transferId); - using (Stream stream = new MemoryStream()) - { - header.Serialize(stream); - - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: partNumber, - headerStream: stream); - } + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: partNumber, + header: header); // Act int partCount = await transferCheckpointer.CurrentJobPartCountAsync(transferId); @@ -601,42 +556,22 @@ public async Task CurrentJobPartCountAsync_MultipleJobs() await AddJobToCheckpointer(transferCheckpointer, transferId); - using (Stream stream = new MemoryStream()) - { - header1.Serialize(stream); - - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: 0, - headerStream: stream); - } - using (Stream stream = new MemoryStream()) - { - header2.Serialize(stream); - - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: 1, - headerStream: stream); - } - using (Stream stream = new MemoryStream()) - { - header3.Serialize(stream); - - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: 2, - headerStream: stream); - } - using (Stream stream = new MemoryStream()) - { - header4.Serialize(stream); - - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: 3, - headerStream: stream); - } + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: 0, + header: header1); + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: 1, + header: header2); + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: 2, + header: header3); + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: 3, + header: header4); // Act int partCount = await transferCheckpointer.CurrentJobPartCountAsync(transferId); @@ -730,15 +665,10 @@ public async Task ReadJobPartPlanFileAsync() SerializerTransferCheckpointer transferCheckpointer = new LocalTransferCheckpointer(test.DirectoryPath); await AddJobToCheckpointer(transferCheckpointer, transferId); - using (MemoryStream stream = new MemoryStream()) - { - header.Serialize(stream); - - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: partNumber, - headerStream: stream); - } + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: partNumber, + header: header); // Act await transferCheckpointer.AssertJobPlanHeaderAsync(transferId, partNumber, header); @@ -891,15 +821,10 @@ public async Task SetJobPartTransferStatusAsync() SerializerTransferCheckpointer transferCheckpointer = new LocalTransferCheckpointer(test.DirectoryPath); await AddJobToCheckpointer(transferCheckpointer, transferId); - using (MemoryStream stream = new MemoryStream()) - { - header.Serialize(stream); - - await transferCheckpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: partNumber, - headerStream: stream); - } + await transferCheckpointer.AddNewJobPartAsync( + transferId: transferId, + partNumber: partNumber, + header: header); // Act await transferCheckpointer.SetJobPartTransferStatusAsync(transferId, partNumber, newStatus); diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryTransferCheckpointer.cs b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryTransferCheckpointer.cs new file mode 100644 index 0000000000000..c3b3967e220c2 --- /dev/null +++ b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryTransferCheckpointer.cs @@ -0,0 +1,178 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Azure.Storage.DataMovement.JobPlan; + +namespace Azure.Storage.DataMovement.Tests; + +internal class MemoryTransferCheckpointer : ITransferCheckpointer +{ + public class Job + { + public string TransferId { get; set; } + public DateTimeOffset CreationTime { get; set; } + public JobPlanOperation Operation { get; set; } + public DataTransferStatus Status { get; set; } + public StorageResource Source { get; set; } + public StorageResource Destination { get; set; } + public bool EnumerationComplete { get; set; } + public Dictionary Parts { get; set; } = new(); + } + public class JobPart + { + public DataTransferStatus Status { get; set; } + private JobPartPlanHeader _plan; + public JobPartPlanHeader Plan + { + // do this conversion because no deep copy implemented currently + get + { + if (_plan == null) + { + return null; + } + using MemoryStream ms = new(); + _plan.Serialize(ms); + return JobPartPlanHeader.Deserialize(ms); + } + set + { + if (value == null) + { + _plan = null; + } + using MemoryStream ms = new(); + value.Serialize(ms); + _plan = JobPartPlanHeader.Deserialize(ms); + } + } + } + + private Dictionary Jobs { get; set; } = new(); + + public Task AddNewJobAsync(string transferId, StorageResource source, StorageResource destination, CancellationToken cancellationToken = default) + { + if (Jobs.ContainsKey(transferId)) + { + throw new Exception($"Job {transferId} already exists."); + } + Jobs[transferId] = new Job + { + TransferId = transferId, + CreationTime = DateTimeOffset.Now, + Operation = JobPlanOperation.Upload, // TODO + Source = source, + Destination = destination + }; + return Task.CompletedTask; + } + + public Task AddNewJobPartAsync(string transferId, int partNumber, JobPartPlanHeader header, CancellationToken cancellationToken = default) + { + if (!Jobs.TryGetValue(transferId, out Job job)) + { + throw new Exception("Job does not exist."); + } + if (job.Parts.ContainsKey(partNumber)) + { + throw new Exception($"Job part {partNumber} already exists for job {job.TransferId}."); + } + job.Parts.Add(partNumber, new() + { + Plan = header, + }); + return Task.CompletedTask; + } + + public Task GetCurrentJobPartCountAsync(string transferId, CancellationToken cancellationToken = default) + { + return Task.FromResult(Jobs.TryGetValue(transferId, out Job job) ? job.Parts.Count : 0); + } + + public Task GetDataTransferPropertiesAsync(string transferId, CancellationToken cancellationToken = default) + { + if (!Jobs.TryGetValue(transferId, out Job job)) + { + throw new Exception("Job does not exist."); + } + return Task.FromResult(new DataTransferProperties() + { + TransferId = job.TransferId, + SourceUri = job.Source.Uri, + SourceProviderId = job.Source.ProviderId, + DestinationUri = job.Destination.Uri, + DestinationProviderId = job.Destination.ProviderId, + IsContainer = job.Source.IsContainer, + }); + } + + public Task GetJobPartAsync(string transferId, int partNumber, CancellationToken cancellationToken = default) + { + if (!Jobs.TryGetValue(transferId, out Job job)) + { + throw new Exception("Job does not exist."); + } + if (job.Parts.TryGetValue(partNumber, out JobPart part)) + { + throw new Exception($"Job part {partNumber} already exists for job {job.TransferId}."); + } + return Task.FromResult(part.Plan); + } + + public Task GetJobStatusAsync(string transferId, CancellationToken cancellationToken = default) + { + if (!Jobs.TryGetValue(transferId, out Job job)) + { + throw new Exception("Job does not exist."); + } + return Task.FromResult(job.Status.DeepCopy()); + } + + public Task> GetStoredTransfersAsync(CancellationToken cancellationToken = default) + { + return Task.FromResult(Jobs.Keys.ToList()); + } + + public Task IsEnumerationCompleteAsync(string transferId, CancellationToken cancellationToken = default) + { + return Task.FromResult(Jobs.TryGetValue(transferId, out Job job) && job.EnumerationComplete); + } + + public Task SetEnumerationCompleteAsync(string transferId, CancellationToken cancellationToken = default) + { + if (Jobs.TryGetValue(transferId, out Job job)) + { + job.EnumerationComplete = true; + } + return Task.CompletedTask; + } + + public Task SetJobPartStatusAsync(string transferId, int partNumber, DataTransferStatus status, CancellationToken cancellationToken = default) + { + if (Jobs.TryGetValue(transferId, out Job job) && job.Parts.TryGetValue(partNumber, out JobPart part)) + { + part.Status = status.DeepCopy(); + } + return Task.CompletedTask; + } + + public Task SetJobStatusAsync(string transferId, DataTransferStatus status, CancellationToken cancellationToken = default) + { + if (Jobs.TryGetValue(transferId, out Job job)) + { + job.Status = status.DeepCopy(); + } + return Task.CompletedTask; + } + + public Task TryRemoveStoredTransferAsync(string transferId, CancellationToken cancellationToken = default) + { + return Task.FromResult(Jobs.Remove(transferId)); + } +}