diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs index 0b4d13593a03f6..76864742a8875d 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs @@ -16,7 +16,6 @@ internal sealed class StreamPipeReader : PipeReader private CancellationTokenSource? _internalTokenSource; private bool _isReaderCompleted; - private bool _isStreamCompleted; private BufferSegment? _readHead; private int _readIndex; @@ -231,12 +230,6 @@ private ValueTask ReadInternalAsync(int? minimumSize, CancellationTo } } - if (_isStreamCompleted) - { - ReadResult completedResult = new ReadResult(buffer: default, isCanceled: false, isCompleted: true); - return new ValueTask(completedResult); - } - return Core(this, minimumSize, tokenSource, cancellationToken); #if NET @@ -253,6 +246,7 @@ static async ValueTask Core(StreamPipeReader reader, int? minimumSiz using (reg) { var isCanceled = false; + bool isCompleted = false; try { // This optimization only makes sense if we don't have anything buffered @@ -277,7 +271,7 @@ static async ValueTask Core(StreamPipeReader reader, int? minimumSiz if (length == 0) { - reader._isStreamCompleted = true; + isCompleted = true; break; } } while (minimumSize != null && reader._bufferedBytes < minimumSize); @@ -302,7 +296,7 @@ static async ValueTask Core(StreamPipeReader reader, int? minimumSiz } } - return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, reader._isStreamCompleted); + return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, isCompleted); } } } @@ -362,11 +356,6 @@ public override async Task CopyToAsync(PipeWriter destination, CancellationToken } } - if (_isStreamCompleted) - { - return; - } - await InnerStream.CopyToAsync(destination, tokenSource.Token).ConfigureAwait(false); } catch (OperationCanceledException) @@ -423,11 +412,6 @@ public override async Task CopyToAsync(Stream destination, CancellationToken can } } - if (_isStreamCompleted) - { - return; - } - await InnerStream.CopyToAsync(destination, tokenSource.Token).ConfigureAwait(false); } catch (OperationCanceledException) @@ -465,7 +449,7 @@ public override bool TryRead(out ReadResult result) private bool TryReadInternal(CancellationTokenSource source, out ReadResult result) { bool isCancellationRequested = source.IsCancellationRequested; - if (isCancellationRequested || _bufferedBytes > 0 && (!_examinedEverything || _isStreamCompleted)) + if (isCancellationRequested || (_bufferedBytes > 0 && !_examinedEverything)) { if (isCancellationRequested) { @@ -474,7 +458,7 @@ private bool TryReadInternal(CancellationTokenSource source, out ReadResult resu ReadOnlySequence buffer = GetCurrentReadOnlySequence(); - result = new ReadResult(buffer, isCancellationRequested, _isStreamCompleted); + result = new ReadResult(buffer, isCancellationRequested, false); return true; } diff --git a/src/libraries/System.IO.Pipelines/tests/StreamPipeReaderTests.cs b/src/libraries/System.IO.Pipelines/tests/StreamPipeReaderTests.cs index f9fb1f87dbafba..9fa3e33945b932 100644 --- a/src/libraries/System.IO.Pipelines/tests/StreamPipeReaderTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/StreamPipeReaderTests.cs @@ -151,10 +151,14 @@ public async Task ReadWithDifferentSettings(int bytesInBuffer, int bufferSize, i [Fact] public async Task ReadAsyncAfterReceivingCompletedReadResultDoesNotThrow() { - var stream = new ThrowAfterZeroByteReadStream(); + byte[] helloBytes = "Hello World"u8.ToArray(); + var stream = new MemoryStream(helloBytes); PipeReader reader = PipeReader.Create(stream); ReadResult readResult = await reader.ReadAsync(); - Assert.True(readResult.Buffer.IsEmpty); + Assert.Equal(helloBytes.Length, readResult.Buffer.Length); + reader.AdvanceTo(readResult.Buffer.End); + + readResult = await reader.ReadAsync(); Assert.True(readResult.IsCompleted); reader.AdvanceTo(readResult.Buffer.End); @@ -166,12 +170,38 @@ public async Task ReadAsyncAfterReceivingCompletedReadResultDoesNotThrow() } [Fact] - public async Task BufferingDataPastEndOfStreamCanBeReadAgain() + public async Task ReadAsyncAfterReceivingCompletedReadResultAndResettingStreamPositionWorks() { byte[] helloBytes = "Hello World"u8.ToArray(); - var stream = new ThrowAfterZeroByteReadStream(helloBytes); + var stream = new MemoryStream(helloBytes); PipeReader reader = PipeReader.Create(stream); + ReadResult readResult = await reader.ReadAsync(); + Assert.Equal(helloBytes, readResult.Buffer.ToArray()); + Assert.False(readResult.IsCompleted); + reader.AdvanceTo(readResult.Buffer.End); + + readResult = await reader.ReadAsync(); + Assert.True(readResult.IsCompleted); + + // Reset the stream position to the beginning + stream.Position = 0; + + readResult = await reader.ReadAsync(); + Assert.Equal(helloBytes, readResult.Buffer.ToArray()); + Assert.False(readResult.IsCompleted); + reader.AdvanceTo(readResult.Buffer.End); + readResult = await reader.ReadAsync(); + Assert.True(readResult.IsCompleted); + reader.Complete(); + } + + [Fact] + public async Task BufferingDataPastEndOfStreamCanBeReadAgain() + { + byte[] helloBytes = "Hello World"u8.ToArray(); + var stream = new MemoryStream(helloBytes); + PipeReader reader = PipeReader.Create(stream); ReadResult readResult = await reader.ReadAsync(); ReadOnlySequence buffer = readResult.Buffer; @@ -669,50 +699,6 @@ public override ValueTask ReadAsync(Memory buffer, CancellationToken #endif } - private class ThrowAfterZeroByteReadStream : MemoryStream - { - public ThrowAfterZeroByteReadStream() - { - - } - - public ThrowAfterZeroByteReadStream(byte[] buffer) : base(buffer) - { - - } - - private bool _throwOnNextCallToRead; - public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - if (_throwOnNextCallToRead) - { - throw new Exception(); - } - var bytes = await base.ReadAsync(buffer, offset, count, cancellationToken); - if (bytes == 0) - { - _throwOnNextCallToRead = true; - } - return bytes; - } - -#if NET - public override async ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) - { - if (_throwOnNextCallToRead) - { - throw new Exception(); - } - var bytes = await base.ReadAsync(destination, cancellationToken); - if (bytes == 0) - { - _throwOnNextCallToRead = true; - } - return bytes; - } -#endif - } - private class DisposalTrackingStream : MemoryStream { public bool DisposeCalled { get; private set; }