Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage][DataMovement] Refactor/add bounds to CommitChunkHandler #47417

Merged
merged 3 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 39 additions & 134 deletions sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,14 @@
// Licensed under the MIT License.

using System;
using Azure.Core;
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Channels;
using Azure.Core.Pipeline;
using Azure.Storage.Common;

namespace Azure.Storage.DataMovement
{
internal class CommitChunkHandler : IDisposable
{
// Indicates whether the current thread is processing stage chunks.
private static Task _processStageChunkEvents;

#region Delegate Definitions
public delegate Task QueuePutBlockTaskInternal(long offset, long blockSize, long expectedLength, StorageResourceItemProperties properties);
public delegate Task QueueCommitBlockTaskInternal(StorageResourceItemProperties sourceProperties);
Expand All @@ -36,29 +30,24 @@ public struct Behaviors
public InvokeFailedEventHandlerInternal InvokeFailedHandler { get; set; }
}

private event SyncAsyncEventHandler<StageChunkEventArgs> _commitBlockHandler;
internal SyncAsyncEventHandler<StageChunkEventArgs> GetCommitBlockHandler() => _commitBlockHandler;

/// <summary>
/// Create channel of <see cref="StageChunkEventArgs"/> to keep track of that are
/// Create channel of <see cref="QueueStageChunkArgs"/> to keep track of that are
/// waiting to update the bytesTransferred and other required operations.
/// </summary>
private readonly Channel<StageChunkEventArgs> _stageChunkChannel;
private CancellationToken _cancellationToken;
private readonly IProcessor<QueueStageChunkArgs> _stageChunkProcessor;
private readonly CancellationToken _cancellationToken;

private long _bytesTransferred;
private readonly long _expectedLength;
private readonly long _blockSize;
private readonly DataTransferOrder _transferOrder;
private readonly ClientDiagnostics _clientDiagnostics;
private readonly StorageResourceItemProperties _sourceProperties;

public CommitChunkHandler(
long expectedLength,
long blockSize,
Behaviors behaviors,
DataTransferOrder transferOrder,
ClientDiagnostics clientDiagnostics,
StorageResourceItemProperties sourceProperties,
CancellationToken cancellationToken)
{
Expand All @@ -67,7 +56,14 @@ public CommitChunkHandler(
throw Errors.InvalidExpectedLength(expectedLength);
}
Argument.AssertNotNull(behaviors, nameof(behaviors));
Argument.AssertNotNull(clientDiagnostics, nameof(clientDiagnostics));

_cancellationToken = cancellationToken;
// Set bytes transferred to block size because we transferred the initial block
_bytesTransferred = blockSize;
_expectedLength = expectedLength;
_blockSize = blockSize;
_transferOrder = transferOrder;
_sourceProperties = sourceProperties;

_queuePutBlockTask = behaviors.QueuePutBlockTask
?? throw Errors.ArgumentNull(nameof(behaviors.QueuePutBlockTask));
Expand All @@ -78,152 +74,61 @@ public CommitChunkHandler(
_invokeFailedEventHandler = behaviors.InvokeFailedHandler
?? throw Errors.ArgumentNull(nameof(behaviors.InvokeFailedHandler));

// Set expected length to perform commit task
_expectedLength = expectedLength;

// Create channel of finished Stage Chunk Args to update the bytesTransferred
// and for ending tasks like commit block.
// The size of the channel should never exceed 50k (limit on blocks in a block blob).
// and that's in the worst case that we never read from the channel and had a maximum chunk blob.
_stageChunkChannel = Channel.CreateUnbounded<StageChunkEventArgs>(
new UnboundedChannelOptions()
{
// Single reader is required as we can only read and write to bytesTransferred value
SingleReader = true,
});
_cancellationToken = cancellationToken;

// Set bytes transferred to block size because we transferred the initial block
_bytesTransferred = blockSize;

_processStageChunkEvents = Task.Run(() => NotifyOfPendingStageChunkEvents());

_blockSize = blockSize;
_transferOrder = transferOrder;
if (_transferOrder == DataTransferOrder.Sequential)
{
_commitBlockHandler += SequentialBlockEvent;
}
_commitBlockHandler += ConcurrentBlockEvent;
_clientDiagnostics = clientDiagnostics;
_sourceProperties = sourceProperties;
_stageChunkProcessor = ChannelProcessing.NewProcessor<QueueStageChunkArgs>(
readers: 1,
capacity: DataMovementConstants.Channels.StageChunkCapacity);
_stageChunkProcessor.Process = ProcessCommitRange;
}

public void Dispose()
{
// We no longer have to read from the channel. We are not expecting any more requests.
_stageChunkChannel.Writer.TryComplete();
DisposeHandlers();
_stageChunkProcessor.TryComplete();
}

private void DisposeHandlers()
public async ValueTask QueueChunkAsync(QueueStageChunkArgs args)
{
if (_transferOrder == DataTransferOrder.Sequential)
{
_commitBlockHandler -= SequentialBlockEvent;
}
_commitBlockHandler -= ConcurrentBlockEvent;
await _stageChunkProcessor.QueueAsync(args).ConfigureAwait(false);
}

private async Task ConcurrentBlockEvent(StageChunkEventArgs args)
private async Task ProcessCommitRange(QueueStageChunkArgs args, CancellationToken cancellationToken = default)
{
try
{
if (args.Success)
{
// Let's add to the channel, and our notifier will handle the chunks.
_stageChunkChannel.Writer.TryWrite(args);
}
else
{
// Log an unexpected error since it came back unsuccessful
throw args.Exception;
}
}
catch (Exception ex)
{
// Log an unexpected error since it came back unsuccessful
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
}
}
_bytesTransferred += args.BytesTransferred;
_reportProgressInBytes(args.BytesTransferred);

private async Task NotifyOfPendingStageChunkEvents()
{
try
{
while (await _stageChunkChannel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
if (_bytesTransferred == _expectedLength)
{
// Read one event argument at a time.
StageChunkEventArgs args = await _stageChunkChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);

// don't need to use Interlocked.Add() as we are reading one event at a time
// and _bytesTransferred is not being read/updated from any other thread
_bytesTransferred += args.BytesTransferred;

// Report the incremental bytes transferred
_reportProgressInBytes(args.BytesTransferred);

if (_bytesTransferred == _expectedLength)
{
// Add CommitBlockList task to the channel
await _queueCommitBlockTask(_sourceProperties).ConfigureAwait(false);
}
else if (_bytesTransferred > _expectedLength)
{
throw Errors.MismatchLengthTransferred(
expectedLength: _expectedLength,
actualLength: _bytesTransferred);
}
// Add CommitBlockList task to the channel
await _queueCommitBlockTask(_sourceProperties).ConfigureAwait(false);
}
}
catch (Exception ex)
{
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
}
}

private async Task SequentialBlockEvent(StageChunkEventArgs args)
{
try
{
if (args.Success)
else if (_bytesTransferred < _expectedLength)
{
long oldOffset = args.Offset;
long newOffset = oldOffset + _blockSize;
if (newOffset < _expectedLength)
// If this is a sequential transfer, we need to queue the next chunk
if (_transferOrder == DataTransferOrder.Sequential)
{
long newOffset = args.Offset + _blockSize;
long blockLength = (newOffset + _blockSize < _expectedLength) ?
_blockSize :
_expectedLength - newOffset;
await _queuePutBlockTask(newOffset, blockLength, _expectedLength, _sourceProperties).ConfigureAwait(false);
_blockSize :
_expectedLength - newOffset;
await _queuePutBlockTask(
newOffset,
blockLength,
_expectedLength,
_sourceProperties).ConfigureAwait(false);
}
}
else
else // _bytesTransferred > _expectedLength
{
// Log an unexpected error since it came back unsuccessful
throw args.Exception;
throw Errors.MismatchLengthTransferred(
expectedLength: _expectedLength,
actualLength: _bytesTransferred);
}
}
catch (Exception ex)
{
// Log an unexpected error since it came back unsuccessful
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
}
}

public async Task InvokeEvent(StageChunkEventArgs args)
{
// There's a race condition where the event handler was disposed and an event
// was already invoked, we should skip over this as the download chunk handler
// was already disposed, and we should just ignore any more incoming events.
if (_commitBlockHandler != null)
{
await _commitBlockHandler.RaiseAsync(
args,
nameof(CommitChunkHandler),
nameof(_commitBlockHandler),
_clientDiagnostics).ConfigureAwait(false);
}
}
}
}
21 changes: 1 addition & 20 deletions sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -504,26 +504,7 @@ internal static long ParseRangeTotalLength(string range)
return long.Parse(range.Substring(lengthSeparator + 1), CultureInfo.InvariantCulture);
}

internal static List<(long Offset, long Size)> GetRangeList(long blockSize, long fileLength)
{
// The list tracking blocks IDs we're going to commit
List<(long Offset, long Size)> partitions = new List<(long, long)>();

// Partition the stream into individual blocks
foreach ((long Offset, long Length) block in GetPartitionIndexes(fileLength, blockSize))
{
/* We need to do this first! Length is calculated on the fly based on stream buffer
* contents; We need to record the partition data first before consuming the stream
* asynchronously. */
partitions.Add(block);
}
return partitions;
}

/// <summary>
/// Partition a stream into a series of blocks buffered as needed by an array pool.
/// </summary>
private static IEnumerable<(long Offset, long Length)> GetPartitionIndexes(
protected static IEnumerable<(long Offset, long Length)> GetRanges(
long streamLength, // StreamLength needed to divide before hand
long blockSize)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

namespace Azure.Storage.DataMovement
{
/// <summary>
/// This class is interchangable for
/// Stage Block (Put Block), Stage Block From Uri (Put Block From URL),
/// Append Block (Append Block), Append Block From Uri (Append Block From URL),
/// Upload Page (Put Page), Upload Pages From Uri (Put Pages From URL)
///
/// Basically any transfer operation that must end in a Commit Block List
/// will end up using this internal event argument to track the success
/// and the bytes transferred to ensure the correct amount of bytes are tranferred.
/// </summary>
internal class QueueStageChunkArgs
{
public long Offset { get; }
public long BytesTransferred { get; }

public QueueStageChunkArgs(long offset, long bytesTransferred)
{
Offset = offset;
BytesTransferred = bytesTransferred;
}
}
}
Loading
Loading