Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage][DataMovement] Update checkpointer to read/write to job file - Part 2 #39101

Merged
merged 5 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,21 @@ public override int Read(byte[] buffer, int offset, int count)
return read;
}

public override int ReadByte()
{
if (Position >= Length)
{
return -1;
}

(byte[] currentBuffer, int _, long offsetOfBuffer) = GetBufferFromPosition();

byte result = currentBuffer[Position - offsetOfBuffer];
Position += 1;

return result;
}

/// <summary>
/// According the the current <see cref="Position"/> of the stream, gets the correct buffer containing the byte
/// at that position, as well as the stream position represented by the start of the array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,56 @@ public async Task WriteStream(int dataSize, int bufferPartitionSize)
Assert.AreEqual(0, pooledMemoryStream.Position);
}

[TestCase(1, 0, 1)]
[TestCase(Constants.KB, 512, 2 * Constants.KB)]
[TestCase(Constants.KB, 512, 512)]
[TestCase(107, 99, 52)]
public async Task ReadByte(int dataSize, int initialReadSize, int bufferPartitionSize)
{
// Arrange
byte[] originalData = GetRandomBuffer(dataSize);
PooledMemoryStream pooledMemoryStream = new PooledMemoryStream(ArrayPool<byte>.Shared, bufferPartitionSize);
await pooledMemoryStream.WriteAsync(originalData, 0, dataSize);
pooledMemoryStream.Position = 0;

// Read some data initially to test boundary conditions with buffers
if (initialReadSize > 0)
{
byte[] readData = new byte[initialReadSize];
await pooledMemoryStream.ReadAsync(readData, 0, initialReadSize);
}

// Act
byte result = Convert.ToByte(pooledMemoryStream.ReadByte());

// Assert
Assert.AreEqual(initialReadSize + 1, pooledMemoryStream.Position);
Assert.AreEqual(originalData[initialReadSize], result);
}

[TestCase(Constants.KB, 2 * Constants.KB)]
[TestCase(Constants.KB, 512)]
[TestCase(107, 52)]
public async Task ReadByte_Full(int dataSize, int bufferPartitionSize)
{
// Arrange
byte[] originalData = GetRandomBuffer(dataSize);
PooledMemoryStream pooledMemoryStream = new PooledMemoryStream(ArrayPool<byte>.Shared, bufferPartitionSize);
await pooledMemoryStream.WriteAsync(originalData, 0, dataSize);
pooledMemoryStream.Position = 0;

// Act
byte[] result = new byte[originalData.Length];
for (int i = 0; i < originalData.Length; i++)
{
result[i] = Convert.ToByte(pooledMemoryStream.ReadByte());
}

// Assert
Assert.AreEqual(originalData.Length, pooledMemoryStream.Position);
AssertSequenceEqual(originalData, result);
}

private static byte[] GetRandomBuffer(long size)
{
Random random = new Random(Environment.TickCount);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -64,5 +65,35 @@ internal static async Task<DataTransferProperties> GetDataTransferPropertiesAsyn
IsContainer = isContainer,
};
}

internal static async Task<bool> IsEnumerationCompleteAsync(
this TransferCheckpointer checkpointer,
string transferId,
CancellationToken cancellationToken)
{
using (Stream stream = await checkpointer.ReadJobPlanFileAsync(
transferId,
DataMovementConstants.JobPlanFile.EnumerationCompleteIndex,
DataMovementConstants.OneByte,
cancellationToken).ConfigureAwait(false))
{
return Convert.ToBoolean(stream.ReadByte());
}
}

internal static async Task OnEnumerationCompleteAsync(
this TransferCheckpointer checkpointer,
string transferId,
CancellationToken cancellationToken)
{
byte[] enumerationComplete = { Convert.ToByte(true) };
await checkpointer.WriteToJobPlanFileAsync(
transferId,
DataMovementConstants.JobPlanFile.EnumerationCompleteIndex,
enumerationComplete,
bufferOffset: 0,
length: 1,
cancellationToken).ConfigureAwait(false);
}
}
}
15 changes: 2 additions & 13 deletions sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,6 @@ internal abstract class JobPartInternal
/// </summary>
internal long? Length;

/// <summary>
/// Defines whether or not this was the final part in the list call. This would determine
/// whether or not we needed to keep listing in the job.
/// </summary>
public bool IsFinalPart { get; internal set; }

internal ClientDiagnostics ClientDiagnostics { get; }

/// <summary>
Expand Down Expand Up @@ -160,7 +154,6 @@ internal JobPartInternal(
TransferCheckpointer checkpointer,
TransferProgressTracker progressTracker,
ArrayPool<byte> arrayPool,
bool isFinalPart,
SyncAsyncEventHandler<TransferStatusEventArgs> jobPartEventHandler,
SyncAsyncEventHandler<TransferStatusEventArgs> statusEventHandler,
SyncAsyncEventHandler<TransferItemFailedEventArgs> failedEventHandler,
Expand All @@ -186,7 +179,6 @@ internal JobPartInternal(
_progressTracker = progressTracker;
_cancellationToken = cancellationToken;
_arrayPool = arrayPool;
IsFinalPart = isFinalPart;
PartTransferStatusEventHandler = jobPartEventHandler;
TransferStatusEventHandler = statusEventHandler;
TransferFailedEventHandler = failedEventHandler;
Expand Down Expand Up @@ -460,13 +452,10 @@ public async virtual Task CleanupAbortedJobPartAsync()
/// Serializes the respective job part and adds it to the checkpointer.
/// </summary>
/// <param name="chunksTotal">Number of chunks in the job part.</param>
/// <param name="isFinalPart">Defines if this part is the last job part of the job.</param>
/// <returns></returns>
public async virtual Task AddJobPartToCheckpointerAsync(int chunksTotal, bool isFinalPart)
public async virtual Task AddJobPartToCheckpointerAsync(int chunksTotal)
{
JobPartPlanHeader header = this.ToJobPartPlanHeader(
jobStatus: JobPartStatus,
isFinalPart: isFinalPart);
JobPartPlanHeader header = this.ToJobPartPlanHeader(jobStatus: JobPartStatus);
using (Stream stream = new MemoryStream())
{
header.Serialize(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal class ServiceToServiceJobPart : JobPartInternal, IAsyncDisposable
/// <summary>
/// Creating job part based on a single transfer job
/// </summary>
private ServiceToServiceJobPart(ServiceToServiceTransferJob job, int partNumber, bool isFinalPart)
private ServiceToServiceJobPart(ServiceToServiceTransferJob job, int partNumber)
: base(dataTransfer: job._dataTransfer,
partNumber: partNumber,
sourceResource: job._sourceResource,
Expand All @@ -35,7 +35,6 @@ private ServiceToServiceJobPart(ServiceToServiceTransferJob job, int partNumber,
checkpointer: job._checkpointer,
progressTracker: job._progressTracker,
arrayPool: job.UploadArrayPool,
isFinalPart: isFinalPart,
jobPartEventHandler: job.GetJobPartStatus(),
statusEventHandler: job.TransferStatusEventHandler,
failedEventHandler: job.TransferFailedEventHandler,
Expand All @@ -54,7 +53,6 @@ private ServiceToServiceJobPart(
int partNumber,
StorageResourceItem sourceResource,
StorageResourceItem destinationResource,
bool isFinalPart,
DataTransferStatus jobPartStatus = default,
long? length = default)
: base(dataTransfer: job._dataTransfer,
Expand All @@ -68,7 +66,6 @@ private ServiceToServiceJobPart(
checkpointer: job._checkpointer,
progressTracker: job._progressTracker,
arrayPool: job.UploadArrayPool,
isFinalPart: isFinalPart,
jobPartEventHandler: job.GetJobPartStatus(),
statusEventHandler: job.TransferStatusEventHandler,
failedEventHandler: job.TransferFailedEventHandler,
Expand All @@ -88,15 +85,11 @@ public async ValueTask DisposeAsync()

public static async Task<ServiceToServiceJobPart> CreateJobPartAsync(
ServiceToServiceTransferJob job,
int partNumber,
bool isFinalPart)
int partNumber)
{
// Create Job Part file as we're intializing the job part
ServiceToServiceJobPart part = new ServiceToServiceJobPart(
job: job,
partNumber: partNumber,
isFinalPart: isFinalPart);
await part.AddJobPartToCheckpointerAsync(1, isFinalPart).ConfigureAwait(false); // For now we only store 1 chunk
// Create Job Part file as we're initializing the job part
ServiceToServiceJobPart part = new ServiceToServiceJobPart(job, partNumber);
await part.AddJobPartToCheckpointerAsync(1).ConfigureAwait(false); // For now we only store 1 chunk
return part;
}

Expand All @@ -105,23 +98,21 @@ public static async Task<ServiceToServiceJobPart> CreateJobPartAsync(
int partNumber,
StorageResourceItem sourceResource,
StorageResourceItem destinationResource,
bool isFinalPart,
DataTransferStatus jobPartStatus = default,
long? length = default,
bool partPlanFileExists = false)
{
// Create Job Part file as we're intializing the job part
// Create Job Part file as we're initializing the job part
ServiceToServiceJobPart part = new ServiceToServiceJobPart(
job: job,
partNumber: partNumber,
jobPartStatus: jobPartStatus,
sourceResource: sourceResource,
destinationResource: destinationResource,
isFinalPart: isFinalPart,
length: length);
if (!partPlanFileExists)
{
await part.AddJobPartToCheckpointerAsync(1, isFinalPart).ConfigureAwait(false); // For now we only store 1 chunk
await part.AddJobPartToCheckpointerAsync(1).ConfigureAwait(false); // For now we only store 1 chunk
}
return part;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
// Single resource transfer, we can skip to chunking the job.
part = await ServiceToServiceJobPart.CreateJobPartAsync(
job: this,
partNumber: partNumber,
isFinalPart: true).ConfigureAwait(false);
partNumber: partNumber).ConfigureAwait(false);
AppendJobPart(part);
}
catch (Exception ex)
Expand All @@ -105,30 +104,24 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
else
{
// Resuming old job with existing job parts
bool isFinalPartFound = false;
foreach (JobPartInternal part in _jobParts)
{
if (!part.JobPartStatus.HasCompletedSuccessfully)
{
part.JobPartStatus.TrySetTransferStateChange(DataTransferState.Queued);
yield return part;

if (part.IsFinalPart)
{
// If we found the final part then we don't have to relist the container.
isFinalPartFound = true;
}
}
}
if (!isFinalPartFound)

if (await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false))
{
await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false))
{
yield return jobPartInternal;
}
}
}
_enumerationComplete = true;

await OnEnumerationComplete().ConfigureAwait(false);
}

Expand All @@ -154,10 +147,9 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
yield break;
}

// List the container keep track of the last job part in order to store it properly
// so we know we finished enumerating/listed.
// List the container in this specific way because MoveNext needs to be separately wrapped
// in a try/catch as we can't yield return inside a try/catch.
bool enumerationCompleted = false;
StorageResource lastResource = default;
while (!enumerationCompleted)
{
try
Expand All @@ -175,68 +167,36 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
}

StorageResource current = enumerator.Current;
if (lastResource != default)
{
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? lastResource.Uri.GetPath()
: lastResource.Uri.GetPath().Substring(containerUriPath.Length + 1);

if (!existingSources.Contains(sourceName))
{
// Because AsyncEnumerable doesn't let us know which storage resource is the last resource
// we only yield return when we know this is not the last storage resource to be listed
// from the container.
ServiceToServiceJobPart part;
try
{
part = await ServiceToServiceJobPart.CreateJobPartAsync(
job: this,
partNumber: partNumber,
sourceResource: (StorageResourceItem)lastResource,
destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName),
isFinalPart: false).ConfigureAwait(false);
AppendJobPart(part);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}
yield return part;
partNumber++;
}
}
lastResource = current;
}
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

// It's possible to have no job parts in a job
if (lastResource != default)
{
ServiceToServiceJobPart lastPart;
try
if (!existingSources.Contains(sourceName))
{
// Return last part but enable the part to be the last job part of the entire job
// so we know that we've finished listing in the container
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string lastSourceName = string.IsNullOrEmpty(containerUriPath)
? lastResource.Uri.GetPath()
: lastResource.Uri.GetPath().Substring(containerUriPath.Length + 1);

lastPart = await ServiceToServiceJobPart.CreateJobPartAsync(
// Because AsyncEnumerable doesn't let us know which storage resource is the last resource
// we only yield return when we know this is not the last storage resource to be listed
// from the container.
ServiceToServiceJobPart part;
try
{
part = await ServiceToServiceJobPart.CreateJobPartAsync(
job: this,
partNumber: partNumber,
sourceResource: (StorageResourceItem)lastResource,
destinationResource: _destinationResourceContainer.GetStorageResourceReference(lastSourceName),
isFinalPart: true).ConfigureAwait(false);
AppendJobPart(lastPart);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
sourceResource: (StorageResourceItem)current,
destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName))
.ConfigureAwait(false);
AppendJobPart(part);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}
yield return part;
partNumber++;
}
yield return lastPart;
}
}
}
Expand Down
Loading