diff --git a/sdk/storage/Azure.Storage.Blobs/tests/PartitionedUploaderTests.cs b/sdk/storage/Azure.Storage.Blobs/tests/PartitionedUploaderTests.cs index 83273eaa1f87..3ad195534f5c 100644 --- a/sdk/storage/Azure.Storage.Blobs/tests/PartitionedUploaderTests.cs +++ b/sdk/storage/Azure.Storage.Blobs/tests/PartitionedUploaderTests.cs @@ -61,7 +61,7 @@ public async Task UploadsStreamInBlocksIfLengthNotAvailable() Assert.AreEqual(1, sink.Staged.Count); Assert.AreEqual(s_response, info); - Assert.AreEqual(2, testPool.TotalRents); // while conceptually there is one rental, the second rental occurs upon checking for stream end on a Read() call + Assert.AreEqual(1, testPool.TotalRents); Assert.AreEqual(0, testPool.CurrentCount); AssertStaged(sink, content); } @@ -107,14 +107,14 @@ public async Task UploadsStreamInBlocksUnderSize() clientMock.SetupGet(c => c.ClientConfiguration).CallBase(); SetupInternalStaging(clientMock, sink); - var uploader = new PartitionedUploader(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 20}, s_validationEmpty, arrayPool: testPool); + var uploader = new PartitionedUploader(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 10}, s_validationEmpty, arrayPool: testPool); Response info = await InvokeUploadAsync(uploader, content); Assert.AreEqual(2, sink.Staged.Count); Assert.AreEqual(s_response, info); AssertStaged(sink, content); - Assert.AreEqual(3, testPool.TotalRents); + Assert.AreEqual(2, testPool.TotalRents); Assert.AreEqual(0, testPool.CurrentCount); } @@ -137,7 +137,7 @@ public async Task MergesLotsOfSmallBlocks() Assert.AreEqual(1, sink.Staged.Count); Assert.AreEqual(s_response, info); - Assert.AreEqual(2, testPool.TotalRents); // while conceptually there is one rental, the second rental occurs upon checking for stream end on a Read() call + Assert.AreEqual(1, testPool.TotalRents); Assert.AreEqual(0, testPool.CurrentCount); AssertStaged(sink, content); } @@ -161,7 +161,7 @@ public async Task SmallMaxWriteSize() Assert.AreEqual(s_response, info); Assert.AreEqual(0, testPool.CurrentCount); - Assert.AreEqual(41, testPool.TotalRents); + Assert.AreEqual(20, testPool.TotalRents); AssertStaged(sink, content); foreach (byte[] bytes in sink.Staged.Values.Select(val => val.Data)) @@ -186,14 +186,14 @@ public async Task MergesBlocksUntilTheyReachOverHalfMaxSize() clientMock.SetupGet(c => c.ClientConfiguration).CallBase(); SetupInternalStaging(clientMock, sink); - var uploader = new PartitionedUploader(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 20}, s_validationEmpty, arrayPool: testPool); + var uploader = new PartitionedUploader(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 10}, s_validationEmpty, arrayPool: testPool); Response info = await InvokeUploadAsync(uploader, content); Assert.AreEqual(2, sink.Staged.Count); // First two should be merged CollectionAssert.AreEqual(new byte[] {0, 0, 0, 0, 0, 1, 1, 1, 1, 1 }, sink.Staged[sink.Blocks.First()].Data); Assert.AreEqual(s_response, info); - Assert.AreEqual(3, testPool.TotalRents); + Assert.AreEqual(2, testPool.TotalRents); Assert.AreEqual(0, testPool.CurrentCount); AssertStaged(sink, content); } @@ -212,7 +212,7 @@ public async Task BlockIdsAre64BytesUniqueBase64Strings() clientMock.SetupGet(c => c.ClientConfiguration).CallBase(); SetupInternalStaging(clientMock, sink); - var uploader = new PartitionedUploader(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 20 }, s_validationEmpty); + var uploader = new PartitionedUploader(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 10 }, s_validationEmpty); Response info = await InvokeUploadAsync(uploader, content); Assert.AreEqual(2, sink.Staged.Count); diff --git a/sdk/storage/Azure.Storage.Common/src/Shared/PartitionedUploader.cs b/sdk/storage/Azure.Storage.Common/src/Shared/PartitionedUploader.cs index 8d4887a0508c..2107ec374595 100644 --- a/sdk/storage/Azure.Storage.Common/src/Shared/PartitionedUploader.cs +++ b/sdk/storage/Azure.Storage.Common/src/Shared/PartitionedUploader.cs @@ -85,8 +85,7 @@ private delegate Task StageContentPartitionAsync( /// private delegate Task<(Stream PartitionContent, ReadOnlyMemory PartitionChecksum)> GetNextStreamPartition( Stream stream, - long minCount, - long maxCount, + long count, long absolutePosition, bool async, CancellationToken cancellationToken); @@ -373,14 +372,14 @@ public async Task> UploadInternal( Stream bufferedContent; if (UseMasterCrc && _masterCrcSupplier != default) { - bufferedContent = await PooledMemoryStream.BufferStreamPartitionInternal( - content, length.Value, length.Value, _arrayPool, - maxArrayPoolRentalSize: default, async, cancellationToken).ConfigureAwait(false); + bufferedContent = new PooledMemoryStream(_arrayPool, Constants.MB); + await content.CopyToInternal(bufferedContent, async, cancellationToken).ConfigureAwait(false); + bufferedContent.Position = 0; } else { (bufferedContent, oneshotValidationOptions) = await BufferAndOptionalChecksumStreamInternal( - content, length.Value, length.Value, oneshotValidationOptions, async, cancellationToken) + content, length.Value, oneshotValidationOptions, async, cancellationToken) .ConfigureAwait(false); } bucket.Add(bufferedContent); @@ -438,11 +437,8 @@ private static long GetActualBlockSize(long? blockSize, long? totalLength) /// /// Stream to buffer. /// - /// - /// Minimum count to buffer from the stream. - /// - /// - /// Maximum count to buffer from the stream. + /// + /// Exact count to buffer from the stream. /// /// /// Validation options for the upload to determine if buffering is needed. @@ -467,8 +463,7 @@ private static long GetActualBlockSize(long? blockSize, long? totalLength) private async Task<(Stream Stream, UploadTransferValidationOptions ValidationOptions)> BufferAndOptionalChecksumStreamInternal( Stream source, - long minCount, - long maxCount, + long? count, UploadTransferValidationOptions validationOptions, bool async, CancellationToken cancellationToken) @@ -488,14 +483,16 @@ private static long GetActualBlockSize(long? blockSize, long? totalLength) .SetupChecksumCalculatingReadStream(source, validationOptions.ChecksumAlgorithm); } - PooledMemoryStream bufferedContent = await PooledMemoryStream.BufferStreamPartitionInternal( - source, - minCount, - maxCount, - _arrayPool, - maxArrayPoolRentalSize: default, - async, - cancellationToken).ConfigureAwait(false); + Stream bufferedContent = new PooledMemoryStream(_arrayPool, Constants.MB); + if (count.HasValue) + { + await source.CopyToExactInternal(bufferedContent, count.Value, async, cancellationToken).ConfigureAwait(false); + } + else + { + await source.CopyToInternal(bufferedContent, async, cancellationToken).ConfigureAwait(false); + } + bufferedContent.Position = 0; if (usingChecksumStream) { @@ -893,12 +890,6 @@ private static async IAsyncEnumerable> GetStreamPartiti bool async, [EnumeratorCancellation] CancellationToken cancellationToken) { - // The minimum amount of data we'll accept from a stream before - // splitting another block. Code that sets `blockSize` will always - // set it to a positive number. Min() only avoids edge case where - // user sets their block size to 1. - long acceptableBlockSize = Math.Max(1, blockSize / 2); - // if we know the data length, assert boundaries before spending resources uploading beyond service capabilities if (streamLength.HasValue) { @@ -910,8 +901,6 @@ private static async IAsyncEnumerable> GetStreamPartiti { throw Errors.InsufficientStorageTransferOptions(streamLength.Value, blockSize, minRequiredBlockSize); } - // bring min up to our min required by the service - acceptableBlockSize = Math.Max(acceptableBlockSize, minRequiredBlockSize); } long read; @@ -920,7 +909,6 @@ private static async IAsyncEnumerable> GetStreamPartiti { (Stream partition, ReadOnlyMemory partitionChecksum) = await getNextPartition( stream, - acceptableBlockSize, blockSize, absolutePosition, async, @@ -954,11 +942,8 @@ private static async IAsyncEnumerable> GetStreamPartiti /// /// Stream to buffer a partition from. /// - /// - /// Minimum amount of data to wait on before finalizing buffer. - /// - /// - /// Max amount of data to buffer before cutting off for the next. + /// + /// Amount of data to wait on before finalizing buffer. /// /// /// Offset of this stream relative to the large stream. @@ -974,8 +959,7 @@ private static async IAsyncEnumerable> GetStreamPartiti /// private async Task<(Stream PartitionContent, ReadOnlyMemory PartitionChecksum)> GetBufferedPartitionInternal( Stream stream, - long minCount, - long maxCount, + long count, long absolutePosition, bool async, CancellationToken cancellationToken) @@ -984,8 +968,7 @@ private static async IAsyncEnumerable> GetStreamPartiti (Stream slicedStream, UploadTransferValidationOptions validationOptions) = await BufferAndOptionalChecksumStreamInternal( stream, - minCount, - maxCount, + count, new UploadTransferValidationOptions { ChecksumAlgorithm = _validationAlgorithm }, async, cancellationToken).ConfigureAwait(false); @@ -1002,10 +985,7 @@ private static async IAsyncEnumerable> GetStreamPartiti /// /// Stream to wrap. /// - /// - /// Unused, but part of definition. - /// - /// + /// /// Length of this facade stream. /// /// @@ -1020,8 +1000,7 @@ private static async IAsyncEnumerable> GetStreamPartiti /// private async Task<(Stream PartitionContent, ReadOnlyMemory PartitionChecksum)> GetStreamedPartitionInternal( Stream stream, - long minCount, - long maxCount, + long count, long absolutePosition, bool async, CancellationToken cancellationToken) @@ -1030,7 +1009,7 @@ private static async IAsyncEnumerable> GetStreamPartiti { throw Errors.InvalidArgument(nameof(stream)); } - var partitionStream = WindowStream.GetWindow(stream, maxCount); + var partitionStream = WindowStream.GetWindow(stream, count); // this resets stream position for us var checksum = await ContentHasher.GetHashOrDefaultInternal( partitionStream, diff --git a/sdk/storage/Azure.Storage.Common/src/Shared/PooledMemoryStream.cs b/sdk/storage/Azure.Storage.Common/src/Shared/PooledMemoryStream.cs index 3e218d18a90a..d6cf3d8094a3 100644 --- a/sdk/storage/Azure.Storage.Common/src/Shared/PooledMemoryStream.cs +++ b/sdk/storage/Azure.Storage.Common/src/Shared/PooledMemoryStream.cs @@ -17,8 +17,6 @@ namespace Azure.Storage.Shared /// internal class PooledMemoryStream : Stream { - private const int DefaultMaxArrayPoolRentalSize = 128 * Constants.MB; - private class BufferPartition { /// @@ -35,7 +33,7 @@ private class BufferPartition /// /// Size to rent from MemoryPool. /// - public int MaxArraySize { get; } + private int _newBufferSize; /// /// The backing array pool. @@ -49,175 +47,30 @@ private class BufferPartition /// private List BufferSet { get; } = new List(); - public PooledMemoryStream(ArrayPool arrayPool, int maxArraySize) - { - ArrayPool = arrayPool; - MaxArraySize = maxArraySize; - } - /// - /// Parameterless constructor for mocking. + /// Creates a new, empty memory stream based on the given ArrayPool. /// - public PooledMemoryStream() { } - - /// - /// Buffers a portion of the given stream, starting at its current position, - /// returning the buffered stream partition. The provided stream position will - /// be incremented by the amount of bytes buffered. - /// - /// - /// Stream to buffer from. - /// - /// - /// Minimum number of bytes to buffer. This method will not return until at - /// least this many bytes have been read from or the - /// stream completes. - /// - /// - /// Maximum number of bytes to buffer. - /// /// - /// Pool to rent buffer space from. - /// - /// - /// Max size we can request from the array pool. + /// Pool to rent memory from. /// - /// - /// Whether to perform this operation asynchronously. + /// + /// Rental size for the individual buffers that will make up the total stream. /// - /// - /// Cancellation token. + /// + /// Size of an initial rental from the array pool. No initial buffer will be rented when default. /// - /// - /// The buffered stream partition with memory backed by an array pool. - /// - internal static async Task BufferStreamPartitionInternal( - Stream stream, - long minCount, - long maxCount, - ArrayPool arrayPool, - int? maxArrayPoolRentalSize, - bool async, - CancellationToken cancellationToken) + public PooledMemoryStream(ArrayPool arrayPool, int bufferSize, int? initialBufferSize = default) { - long totalRead = 0; - var streamPartition = new PooledMemoryStream(arrayPool, maxArrayPoolRentalSize ?? DefaultMaxArrayPoolRentalSize); - - // max count to write into a single array - int maxCountIndividualBuffer; - // min count to write into a single array - int minCountIndividualBuffer; - // the amount that was written into the current array - int readIndividualBuffer; - do + ArrayPool = arrayPool; + _newBufferSize = bufferSize; + if (initialBufferSize.HasValue) { - // buffer to write to - byte[] buffer; - // offset to start writing at - int offset; - BufferPartition latestBuffer = streamPartition.GetLatestBufferWithAvailableSpaceOrDefault(); - // whether we got a brand new buffer to write into - bool newbuffer; - if (latestBuffer != default) + BufferSet.Add(new() { - buffer = latestBuffer.Buffer; - offset = latestBuffer.DataLength; - newbuffer = false; - } - else - { - buffer = arrayPool.Rent((int)Math.Min(maxCount - totalRead, streamPartition.MaxArraySize)); - offset = 0; - newbuffer = true; - } - - // limit max and min count for this buffer by buffer length - maxCountIndividualBuffer = (int)Math.Min(maxCount - totalRead, buffer.Length - offset); - // definitionally limited by max; we won't ever have a swapped min/max range - minCountIndividualBuffer = (int)Math.Min(minCount - totalRead, maxCountIndividualBuffer); - - readIndividualBuffer = await ReadLoopInternal( - stream, - buffer, - offset: offset, - minCountIndividualBuffer, - maxCountIndividualBuffer, - async, - cancellationToken).ConfigureAwait(false); - // if nothing was placed in a brand new array - if (readIndividualBuffer == 0 && newbuffer) - { - arrayPool.Return(buffer); - } - // if brand new array and we did place data in it - else if (newbuffer) - { - streamPartition.BufferSet.Add(new BufferPartition - { - Buffer = buffer, - DataLength = readIndividualBuffer - }); - } - // added to an existing array that was not entirely filled - else - { - latestBuffer.DataLength += readIndividualBuffer; - } - - totalRead += readIndividualBuffer; - - /* If we filled the buffer this loop, then quitting on min count is pointless. The point of quitting - * on min count is when the source stream doesn't have available bytes and we've reached an amount worth - * sending instead of blocking on. If we filled the available array, we don't actually know whether more - * data is available yet, as we limited our read for reasons outside the stream state. We should therefore - * try another read regardless of whether we hit min count. - */ - } while ( - // stream is done if this value is zero; no other check matters - readIndividualBuffer != 0 && - // stop filling the partition if we've hit the max size of the partition - totalRead < maxCount && - // stop filling the partition if we've reached min count and we know we've hit at least a pause in the stream - (totalRead < minCount || readIndividualBuffer == maxCountIndividualBuffer)); - - return streamPartition; - } - - /// - /// Loops Read() calls into buffer until minCount is reached or stream returns 0. - /// - /// Bytes read. - /// - /// This method may have read bytes even if it has reached the confirmed end of stream. You will have to call - /// this method again and read zero bytes to get that confirmation. - /// - private static async Task ReadLoopInternal(Stream stream, byte[] buffer, int offset, int minCount, int maxCount, bool async, CancellationToken cancellationToken) - { - if (minCount > maxCount) - { - throw new ArgumentException($"{nameof(minCount)} cannot be greater than {nameof(maxCount)}."); + Buffer = ArrayPool.Rent(initialBufferSize.Value), + DataLength = 0, + }); } - if (maxCount <= 0) - { - throw new ArgumentException("Cannot read a non-positive number of bytes."); - } - - int totalRead = 0; - do - { - int read = async - ? await stream.ReadAsync(buffer, offset + totalRead, maxCount - totalRead, cancellationToken).ConfigureAwait(false) - : stream.Read(buffer, offset + totalRead, maxCount - totalRead); - // either we have read maxCount in total or the stream has ended - if (read == 0) - { - break; - } - totalRead += read; - // we always request the number that will bring our total read to maxCount - // if the stream can only give us so much at the moment and we've at least hit minCount, we can exit - } while (totalRead < minCount); - return totalRead; } public override bool CanRead => true; @@ -236,6 +89,20 @@ public override void Flush() } public override int Read(byte[] buffer, int offset, int count) + => ReadImpl(new Span(buffer, offset, count)); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => Task.FromResult(ReadImpl(new Span(buffer, offset, count))); + +#if NETCOREAPP3_1_OR_GREATER || NETSTANDARD2_1_OR_GREATER + public override int Read(Span buffer) + => ReadImpl(buffer); + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + => new(ReadImpl(buffer.Span)); +#endif + + private int ReadImpl(Span destBuffer) { if (Position >= Length) { @@ -243,15 +110,15 @@ public override int Read(byte[] buffer, int offset, int count) } int read = 0; - while (read < count && Position < Length) + while (read < destBuffer.Length && Position < Length) { (byte[] currentBuffer, int bufferCount, long offsetOfBuffer) = GetBufferFromPosition(); int toCopy = (int)Min( Length - Position, bufferCount - (Position - offsetOfBuffer), - count - read); - Array.Copy(currentBuffer, Position - offsetOfBuffer, buffer, read, toCopy); + destBuffer.Length - read); + new Span(currentBuffer, (int)(Position - offsetOfBuffer), toCopy).CopyTo(destBuffer.Slice(read)); read += toCopy; Position += toCopy; } @@ -328,14 +195,34 @@ public override void SetLength(long value) } public override void Write(byte[] buffer, int offset, int count) + => WriteImpl(new ReadOnlySpan(buffer, offset, count)); + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + WriteImpl(new ReadOnlySpan(buffer, offset, count)); + return Task.CompletedTask; + } + +#if NETCOREAPP3_1_OR_GREATER || NETSTANDARD2_1_OR_GREATER + public override void Write(ReadOnlySpan buffer) + => WriteImpl(buffer); + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + WriteImpl(buffer.Span); + return new ValueTask(Task.CompletedTask); + } +#endif + + private void WriteImpl(ReadOnlySpan writeBuffer) { - while (count > 0) + while (writeBuffer.Length > 0) { BufferPartition currentBuffer = GetLatestBufferWithAvailableSpaceOrDefault(); if (currentBuffer == default) { - byte[] newBytes = ArrayPool.Rent(MaxArraySize); + byte[] newBytes = ArrayPool.Rent(_newBufferSize); currentBuffer = new BufferPartition { Buffer = newBytes, @@ -344,12 +231,11 @@ public override void Write(byte[] buffer, int offset, int count) BufferSet.Add(currentBuffer); } - int copied = Math.Min(currentBuffer.Buffer.Length - currentBuffer.DataLength, count); - Array.Copy(buffer, offset, currentBuffer.Buffer, currentBuffer.DataLength, copied); + int copied = Math.Min(currentBuffer.Buffer.Length - currentBuffer.DataLength, writeBuffer.Length); + writeBuffer.Slice(0, Math.Min(copied, writeBuffer.Length)).CopyTo(new Span(currentBuffer.Buffer, currentBuffer.DataLength, copied)); currentBuffer.DataLength += copied; - count -= copied; - offset += copied; Position += copied; + writeBuffer = writeBuffer.Slice(copied); } } diff --git a/sdk/storage/Azure.Storage.Common/src/Shared/StorageWriteStream.cs b/sdk/storage/Azure.Storage.Common/src/Shared/StorageWriteStream.cs index ca0880e95fa9..875ceed55bd9 100644 --- a/sdk/storage/Azure.Storage.Common/src/Shared/StorageWriteStream.cs +++ b/sdk/storage/Azure.Storage.Common/src/Shared/StorageWriteStream.cs @@ -93,7 +93,7 @@ protected StorageWriteStream( { _buffer = new PooledMemoryStream( arrayPool: _bufferPool, - maxArraySize: (int)Math.Min(Constants.MB, bufferSize)); + bufferSize: (int)Math.Min(Constants.MB, bufferSize)); _accumulatedDisposables.Add(_buffer); } _bufferChecksumer = ContentHasher.GetHasherFromAlgorithmId(_checksumAlgorithm); diff --git a/sdk/storage/Azure.Storage.Common/src/Shared/StreamExtensions.cs b/sdk/storage/Azure.Storage.Common/src/Shared/StreamExtensions.cs index 31f121d414ea..e899ba2eb695 100644 --- a/sdk/storage/Azure.Storage.Common/src/Shared/StreamExtensions.cs +++ b/sdk/storage/Azure.Storage.Common/src/Shared/StreamExtensions.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +using System; +using System.Buffers; using System.IO; using System.Threading; using System.Threading.Tasks; @@ -12,6 +14,8 @@ namespace Azure.Storage /// internal static partial class StreamExtensions { + private const int DefaultCopyBufferSize = 81920; // default from .NET documentation + public static async Task ReadInternal( this Stream stream, byte[] buffer, @@ -56,7 +60,7 @@ public static Task CopyToInternal( => CopyToInternal( src, dest, - bufferSize: 81920, // default from .NET documentation + DefaultCopyBufferSize, async, cancellationToken); @@ -95,5 +99,43 @@ public static async Task CopyToInternal( src.CopyTo(dest, bufferSize); } } + + public static async Task CopyToExactInternal( + this Stream src, + Stream dst, + long count, + bool async, + CancellationToken cancellationToken) + => await CopyToExactInternal( + src, + dst, + count, + DefaultCopyBufferSize, + async, + cancellationToken) + .ConfigureAwait(false); + + public static async Task CopyToExactInternal( + this Stream src, + Stream dst, + long count, + int copyBufferSize, + bool async, + CancellationToken cancellationToken) + { + using IDisposable _ = ArrayPool.Shared.RentDisposable(copyBufferSize, out byte[] copyBuffer); + long totalCopied = 0; + while (totalCopied < count) + { + int read = await src.ReadInternal(copyBuffer, 0, (int)Math.Min(count - totalCopied, copyBuffer.Length), async, cancellationToken).ConfigureAwait(false); + if (read == 0) + { + break; + } + await dst.WriteInternal(copyBuffer, 0, read, async, cancellationToken).ConfigureAwait(false); + totalCopied += read; + } + return totalCopied; + } } } diff --git a/sdk/storage/Azure.Storage.Common/tests/PooledMemoryStreamTests.cs b/sdk/storage/Azure.Storage.Common/tests/PooledMemoryStreamTests.cs index 054637b6eb64..a69d6223265a 100644 --- a/sdk/storage/Azure.Storage.Common/tests/PooledMemoryStreamTests.cs +++ b/sdk/storage/Azure.Storage.Common/tests/PooledMemoryStreamTests.cs @@ -44,11 +44,13 @@ public void PredictableStreamIsPredictable() [TestCase(Constants.KB, 256)] // buffer split size smaller than data [TestCase(Constants.KB, 2 * Constants.KB)] // buffer split size larger than data. [TestCase(Constants.KB + 11, 256)] // content doesn't line up with buffers (extremely unlikely any array pool implementation will add exactly 11 bytes more than requested across 4 buffers) - public async Task ReadStream(int dataSize, int bufferPartitionSize) + public void ReadStream(int dataSize, int bufferPartitionSize) { PredictableStream originalStream = new PredictableStream(); - PooledMemoryStream arrayPoolStream = await PooledMemoryStream.BufferStreamPartitionInternal(originalStream, dataSize, dataSize, _pool, bufferPartitionSize, true, default); + PooledMemoryStream arrayPoolStream = new(_pool, bufferPartitionSize); + originalStream.CopyToExactInternal(arrayPoolStream, dataSize, async: false, cancellationToken: default).EnsureCompleted(); originalStream.Position = 0; + arrayPoolStream.Position = 0; byte[] originalStreamData = new byte[dataSize]; byte[] poolStreamData = new byte[dataSize]; @@ -67,7 +69,8 @@ public void StreamCanHoldLongData() const long dataSize = (long)int.MaxValue + Constants.MB; const int bufferPartitionSize = 512 * Constants.MB; PredictableStream originalStream = new PredictableStream(); - PooledMemoryStream arrayPoolStream = PooledMemoryStream.BufferStreamPartitionInternal(originalStream, dataSize, dataSize, _pool, bufferPartitionSize, false, default).EnsureCompleted(); + PooledMemoryStream arrayPoolStream = new(_pool, bufferPartitionSize); + originalStream.CopyToExactInternal(arrayPoolStream, dataSize, async: false, cancellationToken: default).EnsureCompleted(); originalStream.Position = 0; // assert it holds the correct amount of data. other tests assert data validity and it's so expensive to do that here. diff --git a/sdk/storage/Azure.Storage.Common/tests/StorageWriteStreamTests.cs b/sdk/storage/Azure.Storage.Common/tests/StorageWriteStreamTests.cs index 564bb72ea9af..e793638f2b76 100644 --- a/sdk/storage/Azure.Storage.Common/tests/StorageWriteStreamTests.cs +++ b/sdk/storage/Azure.Storage.Common/tests/StorageWriteStreamTests.cs @@ -36,7 +36,8 @@ public async Task BasicTest() Mock mockBuffer = new( MockBehavior.Loose, ArrayPool.Shared, - Constants.MB) + Constants.MB, + default(int?)) { CallBase = true, }; @@ -94,7 +95,8 @@ public async Task NonAlignedWrites() Mock mockBuffer = new( MockBehavior.Loose, ArrayPool.Shared, - Constants.MB) + Constants.MB, + default(int?)) { CallBase = true, }; @@ -149,7 +151,8 @@ public async Task WritesLargerThanBufferNonAligned() Mock mockBuffer = new( MockBehavior.Loose, ArrayPool.Shared, - Constants.MB) + Constants.MB, + default(int?)) { CallBase = true, }; @@ -198,7 +201,8 @@ public async Task ErrorsInCommitCleanupArrayPoolOnDispose() Mock mockBuffer = new( MockBehavior.Loose, ArrayPool.Shared, - Constants.MB) + Constants.MB, + default(int?)) { CallBase = true, }; diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/Azure.Storage.DataMovement.Blobs.csproj b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/Azure.Storage.DataMovement.Blobs.csproj index 5bc76b862acf..eb0172f0267c 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/Azure.Storage.DataMovement.Blobs.csproj +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/Azure.Storage.DataMovement.Blobs.csproj @@ -31,6 +31,7 @@ + diff --git a/sdk/storage/Azure.Storage.Files.DataLake/tests/DataLakePartitionedUploaderTests.cs b/sdk/storage/Azure.Storage.Files.DataLake/tests/DataLakePartitionedUploaderTests.cs index 799cfbc12cc6..06244ff43b48 100644 --- a/sdk/storage/Azure.Storage.Files.DataLake/tests/DataLakePartitionedUploaderTests.cs +++ b/sdk/storage/Azure.Storage.Files.DataLake/tests/DataLakePartitionedUploaderTests.cs @@ -78,7 +78,7 @@ public async Task UploadsStreamInBlocksIfLengthNotAvailable() Assert.AreEqual(1, sink.Appended.Count); Assert.AreEqual(s_response, info); - Assert.AreEqual(2, testPool.TotalRents); // while conceptually there is one rental, the second rental occurs upon checking for stream end on a Read() call + Assert.AreEqual(1, testPool.TotalRents); Assert.AreEqual(0, testPool.CurrentCount); AssertAppended(sink, content); } @@ -123,7 +123,7 @@ public async Task UploadsStreamInBlocksUnderSize() var uploader = new PartitionedUploader( DataLakeFileClient.GetPartitionedUploaderBehaviors(clientMock.Object), - new StorageTransferOptions() { MaximumTransferLength = 20 }, + new StorageTransferOptions() { MaximumTransferLength = 10 }, transferValidation: s_validationNone, arrayPool: testPool); Response info = await InvokeUploadAsync(uploader, content); @@ -132,7 +132,7 @@ public async Task UploadsStreamInBlocksUnderSize() Assert.AreEqual(s_response, info); AssertAppended(sink, content); - Assert.AreEqual(3, testPool.TotalRents); + Assert.AreEqual(2, testPool.TotalRents); Assert.AreEqual(0, testPool.CurrentCount); } @@ -160,7 +160,7 @@ public async Task MergesLotsOfSmallBlocks() Assert.AreEqual(1, sink.Appended.Count); Assert.AreEqual(s_response, info); - Assert.AreEqual(2, testPool.TotalRents); // while conceptually there is one rental, the second rental occurs upon checking for stream end on a Read() call + Assert.AreEqual(1, testPool.TotalRents); Assert.AreEqual(0, testPool.CurrentCount); AssertAppended(sink, content); } @@ -189,7 +189,7 @@ public async Task SmallMaxWriteSize() Assert.AreEqual(s_response, info); Assert.AreEqual(0, testPool.CurrentCount); - Assert.AreEqual(41, testPool.TotalRents); + Assert.AreEqual(20, testPool.TotalRents); AssertAppended(sink, content); foreach ((byte[] bytes, _) in sink.Appended.Values) @@ -216,7 +216,7 @@ public async Task MergesBlocksUntilTheyReachOverHalfMaxSize() var uploader = new PartitionedUploader( DataLakeFileClient.GetPartitionedUploaderBehaviors(clientMock.Object), - new StorageTransferOptions() { MaximumTransferLength = 20 }, + new StorageTransferOptions() { MaximumTransferLength = 10 }, transferValidation: s_validationNone, arrayPool: testPool); Response info = await InvokeUploadAsync(uploader, content); @@ -225,7 +225,7 @@ public async Task MergesBlocksUntilTheyReachOverHalfMaxSize() // First two should be merged CollectionAssert.AreEqual(new byte[] { 0, 0, 0, 0, 0, 1, 1, 1, 1, 1 }, sink.Appended[0].Data); Assert.AreEqual(s_response, info); - Assert.AreEqual(3, testPool.TotalRents); + Assert.AreEqual(2, testPool.TotalRents); Assert.AreEqual(0, testPool.CurrentCount); AssertAppended(sink, content); }