Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public async Task UploadsStreamInBlocksIfLengthNotAvailable()

Assert.AreEqual(1, sink.Staged.Count);
Assert.AreEqual(s_response, info);
Assert.AreEqual(2, testPool.TotalRents); // while conceptually there is one rental, the second rental occurs upon checking for stream end on a Read() call
Assert.AreEqual(1, testPool.TotalRents);
Assert.AreEqual(0, testPool.CurrentCount);
AssertStaged(sink, content);
}
Expand Down Expand Up @@ -107,14 +107,14 @@ public async Task UploadsStreamInBlocksUnderSize()
clientMock.SetupGet(c => c.ClientConfiguration).CallBase();
SetupInternalStaging(clientMock, sink);

var uploader = new PartitionedUploader<BlobUploadOptions, BlobContentInfo>(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 20}, s_validationEmpty, arrayPool: testPool);
var uploader = new PartitionedUploader<BlobUploadOptions, BlobContentInfo>(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 10}, s_validationEmpty, arrayPool: testPool);
Response<BlobContentInfo> info = await InvokeUploadAsync(uploader, content);

Assert.AreEqual(2, sink.Staged.Count);
Assert.AreEqual(s_response, info);
AssertStaged(sink, content);

Assert.AreEqual(3, testPool.TotalRents);
Assert.AreEqual(2, testPool.TotalRents);
Assert.AreEqual(0, testPool.CurrentCount);
}

Expand All @@ -137,7 +137,7 @@ public async Task MergesLotsOfSmallBlocks()

Assert.AreEqual(1, sink.Staged.Count);
Assert.AreEqual(s_response, info);
Assert.AreEqual(2, testPool.TotalRents); // while conceptually there is one rental, the second rental occurs upon checking for stream end on a Read() call
Assert.AreEqual(1, testPool.TotalRents);
Assert.AreEqual(0, testPool.CurrentCount);
AssertStaged(sink, content);
}
Expand All @@ -161,7 +161,7 @@ public async Task SmallMaxWriteSize()

Assert.AreEqual(s_response, info);
Assert.AreEqual(0, testPool.CurrentCount);
Assert.AreEqual(41, testPool.TotalRents);
Assert.AreEqual(20, testPool.TotalRents);
AssertStaged(sink, content);

foreach (byte[] bytes in sink.Staged.Values.Select(val => val.Data))
Expand All @@ -186,14 +186,14 @@ public async Task MergesBlocksUntilTheyReachOverHalfMaxSize()
clientMock.SetupGet(c => c.ClientConfiguration).CallBase();
SetupInternalStaging(clientMock, sink);

var uploader = new PartitionedUploader<BlobUploadOptions, BlobContentInfo>(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 20}, s_validationEmpty, arrayPool: testPool);
var uploader = new PartitionedUploader<BlobUploadOptions, BlobContentInfo>(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 10}, s_validationEmpty, arrayPool: testPool);
Response<BlobContentInfo> info = await InvokeUploadAsync(uploader, content);

Assert.AreEqual(2, sink.Staged.Count);
// First two should be merged
CollectionAssert.AreEqual(new byte[] {0, 0, 0, 0, 0, 1, 1, 1, 1, 1 }, sink.Staged[sink.Blocks.First()].Data);
Assert.AreEqual(s_response, info);
Assert.AreEqual(3, testPool.TotalRents);
Assert.AreEqual(2, testPool.TotalRents);
Assert.AreEqual(0, testPool.CurrentCount);
AssertStaged(sink, content);
}
Expand All @@ -212,7 +212,7 @@ public async Task BlockIdsAre64BytesUniqueBase64Strings()
clientMock.SetupGet(c => c.ClientConfiguration).CallBase();
SetupInternalStaging(clientMock, sink);

var uploader = new PartitionedUploader<BlobUploadOptions, BlobContentInfo>(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 20 }, s_validationEmpty);
var uploader = new PartitionedUploader<BlobUploadOptions, BlobContentInfo>(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 10 }, s_validationEmpty);
Response<BlobContentInfo> info = await InvokeUploadAsync(uploader, content);

Assert.AreEqual(2, sink.Staged.Count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ private delegate Task StageContentPartitionAsync<TContent>(
/// </summary>
private delegate Task<(Stream PartitionContent, ReadOnlyMemory<byte> PartitionChecksum)> GetNextStreamPartition(
Stream stream,
long minCount,
long maxCount,
long count,
long absolutePosition,
bool async,
CancellationToken cancellationToken);
Expand Down Expand Up @@ -373,14 +372,14 @@ public async Task<Response<TCompleteUploadReturn>> UploadInternal(
Stream bufferedContent;
if (UseMasterCrc && _masterCrcSupplier != default)
{
bufferedContent = await PooledMemoryStream.BufferStreamPartitionInternal(
content, length.Value, length.Value, _arrayPool,
maxArrayPoolRentalSize: default, async, cancellationToken).ConfigureAwait(false);
bufferedContent = new PooledMemoryStream(_arrayPool, Constants.MB);
await content.CopyToInternal(bufferedContent, async, cancellationToken).ConfigureAwait(false);
bufferedContent.Position = 0;
}
else
{
(bufferedContent, oneshotValidationOptions) = await BufferAndOptionalChecksumStreamInternal(
content, length.Value, length.Value, oneshotValidationOptions, async, cancellationToken)
content, length.Value, oneshotValidationOptions, async, cancellationToken)
.ConfigureAwait(false);
}
bucket.Add(bufferedContent);
Expand Down Expand Up @@ -438,11 +437,8 @@ private static long GetActualBlockSize(long? blockSize, long? totalLength)
/// <param name="source">
/// Stream to buffer.
/// </param>
/// <param name="minCount">
/// Minimum count to buffer from the stream.
/// </param>
/// <param name="maxCount">
/// Maximum count to buffer from the stream.
/// <param name="count">
/// Exact count to buffer from the stream.
/// </param>
/// <param name="validationOptions">
/// Validation options for the upload to determine if buffering is needed.
Expand All @@ -467,8 +463,7 @@ private static long GetActualBlockSize(long? blockSize, long? totalLength)
private async Task<(Stream Stream, UploadTransferValidationOptions ValidationOptions)>
BufferAndOptionalChecksumStreamInternal(
Stream source,
long minCount,
long maxCount,
long? count,
UploadTransferValidationOptions validationOptions,
bool async,
CancellationToken cancellationToken)
Expand All @@ -488,14 +483,16 @@ private static long GetActualBlockSize(long? blockSize, long? totalLength)
.SetupChecksumCalculatingReadStream(source, validationOptions.ChecksumAlgorithm);
}

PooledMemoryStream bufferedContent = await PooledMemoryStream.BufferStreamPartitionInternal(
source,
minCount,
maxCount,
_arrayPool,
maxArrayPoolRentalSize: default,
async,
cancellationToken).ConfigureAwait(false);
Stream bufferedContent = new PooledMemoryStream(_arrayPool, Constants.MB);
if (count.HasValue)
{
await source.CopyToExactInternal(bufferedContent, count.Value, async, cancellationToken).ConfigureAwait(false);
}
else
{
await source.CopyToInternal(bufferedContent, async, cancellationToken).ConfigureAwait(false);
}
bufferedContent.Position = 0;

if (usingChecksumStream)
{
Expand Down Expand Up @@ -893,12 +890,6 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
bool async,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
// The minimum amount of data we'll accept from a stream before
// splitting another block. Code that sets `blockSize` will always
// set it to a positive number. Min() only avoids edge case where
// user sets their block size to 1.
long acceptableBlockSize = Math.Max(1, blockSize / 2);

// if we know the data length, assert boundaries before spending resources uploading beyond service capabilities
if (streamLength.HasValue)
{
Expand All @@ -910,8 +901,6 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
{
throw Errors.InsufficientStorageTransferOptions(streamLength.Value, blockSize, minRequiredBlockSize);
}
// bring min up to our min required by the service
acceptableBlockSize = Math.Max(acceptableBlockSize, minRequiredBlockSize);
}

long read;
Expand All @@ -920,7 +909,6 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
{
(Stream partition, ReadOnlyMemory<byte> partitionChecksum) = await getNextPartition(
stream,
acceptableBlockSize,
blockSize,
absolutePosition,
async,
Expand Down Expand Up @@ -954,11 +942,8 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
/// <param name="stream">
/// Stream to buffer a partition from.
/// </param>
/// <param name="minCount">
/// Minimum amount of data to wait on before finalizing buffer.
/// </param>
/// <param name="maxCount">
/// Max amount of data to buffer before cutting off for the next.
/// <param name="count">
/// Amount of data to wait on before finalizing buffer.
/// </param>
/// <param name="absolutePosition">
/// Offset of this stream relative to the large stream.
Expand All @@ -974,8 +959,7 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
/// </returns>
private async Task<(Stream PartitionContent, ReadOnlyMemory<byte> PartitionChecksum)> GetBufferedPartitionInternal(
Stream stream,
long minCount,
long maxCount,
long count,
long absolutePosition,
bool async,
CancellationToken cancellationToken)
Expand All @@ -984,8 +968,7 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
(Stream slicedStream, UploadTransferValidationOptions validationOptions)
= await BufferAndOptionalChecksumStreamInternal(
stream,
minCount,
maxCount,
count,
new UploadTransferValidationOptions { ChecksumAlgorithm = _validationAlgorithm },
async,
cancellationToken).ConfigureAwait(false);
Expand All @@ -1002,10 +985,7 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
/// <param name="stream">
/// Stream to wrap.
/// </param>
/// <param name="minCount">
/// Unused, but part of <see cref="GetNextStreamPartition"/> definition.
/// </param>
/// <param name="maxCount">
/// <param name="count">
/// Length of this facade stream.
/// </param>
/// <param name="absolutePosition">
Expand All @@ -1020,8 +1000,7 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
/// </returns>
private async Task<(Stream PartitionContent, ReadOnlyMemory<byte> PartitionChecksum)> GetStreamedPartitionInternal(
Stream stream,
long minCount,
long maxCount,
long count,
long absolutePosition,
bool async,
CancellationToken cancellationToken)
Expand All @@ -1030,7 +1009,7 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
{
throw Errors.InvalidArgument(nameof(stream));
}
var partitionStream = WindowStream.GetWindow(stream, maxCount);
var partitionStream = WindowStream.GetWindow(stream, count);
// this resets stream position for us
var checksum = await ContentHasher.GetHashOrDefaultInternal(
partitionStream,
Expand Down
Loading